From commits-return-84942-archive-asf-public=cust-asf.ponee.io@hbase.apache.org Fri Apr 12 07:09:26 2019
Return-Path:
X-Original-To: archive-asf-public@cust-asf.ponee.io
Delivered-To: archive-asf-public@cust-asf.ponee.io
Received: from mail.apache.org (hermes.apache.org [207.244.88.153])
by mx-eu-01.ponee.io (Postfix) with SMTP id E688C180763
for ; Fri, 12 Apr 2019 09:09:22 +0200 (CEST)
Received: (qmail 32648 invoked by uid 500); 12 Apr 2019 07:09:14 -0000
Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm
Precedence: bulk
List-Help:
List-Unsubscribe:
List-Post:
List-Id:
Reply-To: dev@hbase.apache.org
Delivered-To: mailing list commits@hbase.apache.org
Received: (qmail 32262 invoked by uid 99); 12 Apr 2019 07:09:14 -0000
Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70)
by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 12 Apr 2019 07:09:14 +0000
Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33)
id 51F97814C8; Fri, 12 Apr 2019 07:09:14 +0000 (UTC)
Date: Fri, 12 Apr 2019 07:09:24 +0000
To: "commits@hbase.apache.org"
Subject: [hbase] 12/13: HBASE-21717 Implement Connection based on
AsyncConnection
MIME-Version: 1.0
Content-Type: text/plain; charset=utf-8
Content-Transfer-Encoding: 8bit
From: zhangduo@apache.org
In-Reply-To: <155505295272.4969.17653673293982837413@gitbox.apache.org>
References: <155505295272.4969.17653673293982837413@gitbox.apache.org>
X-Git-Host: gitbox.apache.org
X-Git-Repo: hbase
X-Git-Refname: refs/heads/HBASE-21512
X-Git-Reftype: branch
X-Git-Rev: 40ff92e5afa8a8b503b1e1209092135d913014dd
X-Git-NotificationType: diff
X-Git-Multimail-Version: 1.5.dev
Auto-Submitted: auto-generated
Message-Id: <20190412070914.51F97814C8@gitbox.apache.org>
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch HBASE-21512
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 40ff92e5afa8a8b503b1e1209092135d913014dd
Author: Duo Zhang
AuthorDate: Thu Mar 7 11:51:51 2019 +0800
HBASE-21717 Implement Connection based on AsyncConnection
---
.../apache/hadoop/hbase/backup/TestBackupBase.java | 5 +-
.../hadoop/hbase/backup/TestBackupMerge.java | 11 +-
.../hbase/backup/TestBackupMultipleDeletes.java | 10 +-
.../hadoop/hbase/backup/TestIncrementalBackup.java | 16 +-
.../backup/TestIncrementalBackupDeleteTable.java | 10 +-
.../TestIncrementalBackupMergeWithFailures.java | 7 +-
.../backup/TestIncrementalBackupWithBulkLoad.java | 6 +-
.../backup/TestIncrementalBackupWithFailures.java | 6 +-
.../hadoop/hbase/backup/TestRemoteBackup.java | 8 +-
.../hbase/backup/master/TestBackupLogCleaner.java | 6 +-
.../hadoop/hbase/client/AsyncConnection.java | 8 +
.../hadoop/hbase/client/AsyncConnectionImpl.java | 49 +-
.../hbase/client/AsyncRegionLocatorHelper.java | 3 +-
.../hbase/client/AsyncTableResultScanner.java | 4 +
.../org/apache/hadoop/hbase/client/Connection.java | 17 +-
.../hadoop/hbase/client/ConnectionFactory.java | 44 +-
.../hbase/client/ConnectionImplementation.java | 54 +-
.../client/ConnectionOverAsyncConnection.java | 180 +++++++
.../hadoop/hbase/client/ConnectionUtils.java | 103 ++--
.../org/apache/hadoop/hbase/client/HBaseAdmin.java | 5 +-
.../org/apache/hadoop/hbase/client/HTable.java | 147 +-----
.../client/RegionCoprocessorRpcChannelImpl.java | 37 +-
.../java/org/apache/hadoop/hbase/client/Scan.java | 5 +
.../java/org/apache/hadoop/hbase/client/Table.java | 545 +++++----------------
.../hadoop/hbase/client/TableOverAsyncTable.java | 527 ++++++++++++++++++++
.../hadoop/hbase/ipc/CoprocessorRpcChannel.java | 12 +-
.../apache/hadoop/hbase/client/SimpleRegistry.java | 83 ++++
.../hadoop/hbase/client/TestAsyncProcess.java | 2 +-
.../hadoop/hbase/client/TestBufferedMutator.java | 14 +-
.../hadoop/hbase/client/TestClientNoCluster.java | 33 +-
.../hbase/mapreduce/TestHFileOutputFormat2.java | 31 +-
.../mapreduce/TestMultiTableInputFormatBase.java | 6 +
.../hbase/mapreduce/TestTableInputFormatBase.java | 6 +
.../org/apache/hadoop/hbase/rest/ResourceBase.java | 16 +-
.../apache/hadoop/hbase/rest/SchemaResource.java | 18 +-
.../hadoop/hbase/rest/client/RemoteHTable.java | 268 +++-------
.../hadoop/hbase/rest/TestScannerResource.java | 10 +-
.../hadoop/hbase/rest/client/TestRemoteTable.java | 7 +-
.../hbase/client/SharedAsyncConnection.java} | 109 ++---
.../hbase/{ => client}/SharedConnection.java | 18 +-
.../org/apache/hadoop/hbase/master/HMaster.java | 5 +-
.../hadoop/hbase/master/MasterCoprocessorHost.java | 2 +-
.../hadoop/hbase/regionserver/HRegionServer.java | 68 +--
.../hbase/regionserver/RegionCoprocessorHost.java | 2 +-
.../regionserver/RegionServerCoprocessorHost.java | 2 +-
.../hbase/security/access/AccessController.java | 13 +-
.../apache/hadoop/hbase/util/MultiHConnection.java | 141 ------
.../main/resources/hbase-webapps/master/table.jsp | 5 +-
.../apache/hadoop/hbase/HBaseTestingUtility.java | 95 ++--
.../hbase/TestPartialResultsFromClientSide.java | 23 +-
.../TestServerSideScanMetricsFromClientSide.java | 51 +-
.../example/TestZooKeeperTableArchiveClient.java | 19 +-
.../client/AbstractTestCIOperationTimeout.java | 4 +-
.../hbase/client/AbstractTestCIRpcTimeout.java | 2 +-
.../hadoop/hbase/client/AbstractTestCITimeout.java | 2 +-
.../hbase/client/DummyAsyncClusterConnection.java | 5 +
.../org/apache/hadoop/hbase/client/TestAdmin1.java | 24 +-
.../org/apache/hadoop/hbase/client/TestAdmin2.java | 17 +-
.../hbase/client/TestAlwaysSetScannerId.java | 29 +-
.../hbase/client/TestAsyncTableAdminApi.java | 2 +-
.../TestAvoidCellReferencesIntoShippedBlocks.java | 7 +
.../hadoop/hbase/client/TestCIBadHostname.java | 28 +-
.../apache/hadoop/hbase/client/TestCISleep.java | 71 +--
.../hadoop/hbase/client/TestCheckAndMutate.java | 30 +-
.../hadoop/hbase/client/TestClientPushback.java | 214 ++++----
.../hbase/client/TestConnectionImplementation.java | 3 +
.../hadoop/hbase/client/TestFromClientSide.java | 182 +++----
.../hadoop/hbase/client/TestFromClientSide3.java | 89 ++--
.../client/TestFromClientSideScanExcpetion.java | 9 +-
.../hbase/client/TestGetProcedureResult.java | 7 +-
.../hbase/client/TestIncrementsFromClientSide.java | 61 +--
.../hadoop/hbase/client/TestLeaseRenewal.java | 136 -----
.../hbase/client/TestMalformedCellFromClient.java | 36 +-
.../apache/hadoop/hbase/client/TestMetaCache.java | 5 +
.../hadoop/hbase/client/TestMetaWithReplicas.java | 9 +-
.../client/TestMultiActionMetricsFromClient.java | 13 +-
.../hadoop/hbase/client/TestMultiParallel.java | 108 ++--
.../hbase/client/TestMultiRespectsLimits.java | 21 +-
.../hbase/client/TestRegionLocationCaching.java | 5 +
.../hbase/client/TestReplicaWithCluster.java | 8 +-
.../hadoop/hbase/client/TestReplicasClient.java | 38 +-
.../hbase/client/TestScanWithoutFetchingData.java | 27 +-
.../hbase/client/TestScannersFromClientSide.java | 168 +------
.../hbase/client/TestSeparateClientZKCluster.java | 48 +-
.../hbase/client/TestShortCircuitConnection.java | 95 ----
...C.java => TestCoprocessorSharedConnection.java} | 23 +-
.../TestPassCustomCellViaRegionObserver.java | 5 +-
.../hbase/filter/TestMultiRowRangeFilter.java | 45 +-
.../hadoop/hbase/master/TestMasterShutdown.java | 21 +-
.../hadoop/hbase/master/TestWarmupRegion.java | 4 +-
.../hadoop/hbase/regionserver/RegionAsTable.java | 120 +----
.../regionserver/TestEndToEndSplitTransaction.java | 2 +-
.../hbase/regionserver/TestHRegionFileSystem.java | 17 +-
.../TestNewVersionBehaviorFromClientSide.java | 7 +-
.../regionserver/TestPerColumnFamilyFlush.java | 7 -
.../regionserver/TestRegionServerMetrics.java | 91 ++--
.../regionserver/TestScannerHeartbeatMessages.java | 5 +
.../TestSettingTimeoutOnBlockingPoint.java | 14 +-
.../hbase/replication/TestReplicationBase.java | 2 +-
.../hbase/replication/TestReplicationWithTags.java | 2 +-
.../TestGlobalReplicationThrottler.java | 2 +-
.../TestCoprocessorWhitelistMasterObserver.java | 8 +-
...tVisibilityLabelReplicationWithExpAsString.java | 2 +-
.../TestVisibilityLabelsReplication.java | 2 +-
.../hbase/snapshot/TestRegionSnapshotTask.java | 2 +-
.../apache/hadoop/hbase/tool/TestCanaryTool.java | 3 +-
.../hadoop/hbase/util/MultiThreadedAction.java | 5 +-
.../util/hbck/OfflineMetaRebuildTestCore.java | 7 +-
.../hbase/thrift/ThriftHBaseServiceHandler.java | 12 +-
.../hbase/thrift2/ThriftHBaseServiceHandler.java | 2 +-
.../hbase/thrift2/client/ThriftConnection.java | 6 +
.../hadoop/hbase/thrift2/client/ThriftTable.java | 4 +-
.../hadoop/hbase/thrift2/TestThriftConnection.java | 2 +-
113 files changed, 2134 insertions(+), 2663 deletions(-)
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
index 2afdb4f..e0fca20 100644
--- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
@@ -54,7 +54,6 @@ import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
@@ -354,9 +353,9 @@ public class TestBackupBase {
TEST_UTIL.shutdownMiniMapReduceCluster();
}
- HTable insertIntoTable(Connection conn, TableName table, byte[] family, int id, int numRows)
+ Table insertIntoTable(Connection conn, TableName table, byte[] family, int id, int numRows)
throws IOException {
- HTable t = (HTable) conn.getTable(table);
+ Table t = conn.getTable(table);
Put p1;
for (int i = 0; i < numRows; i++) {
p1 = new Put(Bytes.toBytes("row-" + table + "-" + id + "-" + i));
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupMerge.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupMerge.java
index 8ead548..beacef3 100644
--- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupMerge.java
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupMerge.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.backup;
import static org.junit.Assert.assertTrue;
import java.util.List;
-
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
@@ -28,10 +27,8 @@ import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.LargeTests;
-import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
@@ -39,6 +36,8 @@ import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+
@Category(LargeTests.class)
public class TestBackupMerge extends TestBackupBase {
@@ -72,14 +71,14 @@ public class TestBackupMerge extends TestBackupBase {
assertTrue(checkSucceeded(backupIdFull));
// #2 - insert some data to table1
- HTable t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS);
+ Table t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS);
LOG.debug("writing " + ADD_ROWS + " rows to " + table1);
Assert.assertEquals(TEST_UTIL.countRows(t1), NB_ROWS_IN_BATCH + ADD_ROWS);
t1.close();
LOG.debug("written " + ADD_ROWS + " rows to " + table1);
- HTable t2 = insertIntoTable(conn, table2, famName, 1, ADD_ROWS);
+ Table t2 = insertIntoTable(conn, table2, famName, 1, ADD_ROWS);
Assert.assertEquals(TEST_UTIL.countRows(t2), NB_ROWS_IN_BATCH + ADD_ROWS);
t2.close();
@@ -115,7 +114,7 @@ public class TestBackupMerge extends TestBackupBase {
tablesRestoreIncMultiple, tablesMapIncMultiple, true));
Table hTable = conn.getTable(table1_restore);
- LOG.debug("After incremental restore: " + hTable.getTableDescriptor());
+ LOG.debug("After incremental restore: " + hTable.getDescriptor());
int countRows = TEST_UTIL.countRows(hTable, famName);
LOG.debug("f1 has " + countRows + " rows");
Assert.assertEquals(NB_ROWS_IN_BATCH + 2 * ADD_ROWS, countRows);
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupMultipleDeletes.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupMultipleDeletes.java
index db1a4e2..bffa480 100644
--- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupMultipleDeletes.java
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupMultipleDeletes.java
@@ -29,8 +29,8 @@ import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Assert;
@@ -67,7 +67,7 @@ public class TestBackupMultipleDeletes extends TestBackupBase {
String backupIdFull = client.backupTables(request);
assertTrue(checkSucceeded(backupIdFull));
// #2 - insert some data to table table1
- HTable t1 = (HTable) conn.getTable(table1);
+ Table t1 = conn.getTable(table1);
Put p1;
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
p1 = new Put(Bytes.toBytes("row-t1" + i));
@@ -82,7 +82,7 @@ public class TestBackupMultipleDeletes extends TestBackupBase {
String backupIdInc1 = client.backupTables(request);
assertTrue(checkSucceeded(backupIdInc1));
// #4 - insert some data to table table2
- HTable t2 = (HTable) conn.getTable(table2);
+ Table t2 = conn.getTable(table2);
Put p2 = null;
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
p2 = new Put(Bytes.toBytes("row-t2" + i));
@@ -95,7 +95,7 @@ public class TestBackupMultipleDeletes extends TestBackupBase {
String backupIdInc2 = client.backupTables(request);
assertTrue(checkSucceeded(backupIdInc2));
// #6 - insert some data to table table1
- t1 = (HTable) conn.getTable(table1);
+ t1 = conn.getTable(table1);
for (int i = NB_ROWS_IN_BATCH; i < 2 * NB_ROWS_IN_BATCH; i++) {
p1 = new Put(Bytes.toBytes("row-t1" + i));
p1.addColumn(famName, qualName, Bytes.toBytes("val" + i));
@@ -107,7 +107,7 @@ public class TestBackupMultipleDeletes extends TestBackupBase {
String backupIdInc3 = client.backupTables(request);
assertTrue(checkSucceeded(backupIdInc3));
// #8 - insert some data to table table2
- t2 = (HTable) conn.getTable(table2);
+ t2 = conn.getTable(table2);
for (int i = NB_ROWS_IN_BATCH; i < 2 * NB_ROWS_IN_BATCH; i++) {
p2 = new Put(Bytes.toBytes("row-t1" + i));
p2.addColumn(famName, qualName, Bytes.toBytes("val" + i));
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java
index 749839c..35a77ea 100644
--- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java
@@ -32,8 +32,8 @@ import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -101,7 +101,7 @@ public class TestIncrementalBackup extends TestBackupBase {
assertTrue(checkSucceeded(backupIdFull));
// #2 - insert some data to table
- HTable t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS);
+ Table t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS);
LOG.debug("writing " + ADD_ROWS + " rows to " + table1);
Assert.assertEquals(HBaseTestingUtility.countRows(t1),
NB_ROWS_IN_BATCH + ADD_ROWS + NB_ROWS_FAM3);
@@ -115,7 +115,7 @@ public class TestIncrementalBackup extends TestBackupBase {
Assert.assertEquals(HBaseTestingUtility.countRows(t1),
NB_ROWS_IN_BATCH + ADD_ROWS + NB_ROWS_MOB);
- HTable t2 = (HTable) conn.getTable(table2);
+ Table t2 = conn.getTable(table2);
Put p2;
for (int i = 0; i < 5; i++) {
p2 = new Put(Bytes.toBytes("row-t2" + i));
@@ -162,7 +162,7 @@ public class TestIncrementalBackup extends TestBackupBase {
HBaseTestingUtility.modifyTableSync(TEST_UTIL.getAdmin(), table1Desc);
int NB_ROWS_FAM2 = 7;
- HTable t3 = insertIntoTable(conn, table1, fam2Name, 2, NB_ROWS_FAM2);
+ Table t3 = insertIntoTable(conn, table1, fam2Name, 2, NB_ROWS_FAM2);
t3.close();
// Wait for 5 sec to make sure that old WALs were deleted
@@ -188,11 +188,11 @@ public class TestIncrementalBackup extends TestBackupBase {
hAdmin.close();
// #6.2 - checking row count of tables for full restore
- HTable hTable = (HTable) conn.getTable(table1_restore);
+ Table hTable = conn.getTable(table1_restore);
Assert.assertEquals(HBaseTestingUtility.countRows(hTable), NB_ROWS_IN_BATCH + NB_ROWS_FAM3);
hTable.close();
- hTable = (HTable) conn.getTable(table2_restore);
+ hTable = conn.getTable(table2_restore);
Assert.assertEquals(NB_ROWS_IN_BATCH, HBaseTestingUtility.countRows(hTable));
hTable.close();
@@ -201,7 +201,7 @@ public class TestIncrementalBackup extends TestBackupBase {
TableName[] tablesMapIncMultiple = new TableName[] { table1_restore, table2_restore };
client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backupIdIncMultiple2,
false, tablesRestoreIncMultiple, tablesMapIncMultiple, true));
- hTable = (HTable) conn.getTable(table1_restore);
+ hTable = conn.getTable(table1_restore);
LOG.debug("After incremental restore: " + hTable.getDescriptor());
int countFamName = TEST_UTIL.countRows(hTable, famName);
@@ -217,7 +217,7 @@ public class TestIncrementalBackup extends TestBackupBase {
Assert.assertEquals(countMobName, NB_ROWS_MOB);
hTable.close();
- hTable = (HTable) conn.getTable(table2_restore);
+ hTable = conn.getTable(table2_restore);
Assert.assertEquals(NB_ROWS_IN_BATCH + 5, HBaseTestingUtility.countRows(hTable));
hTable.close();
admin.close();
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupDeleteTable.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupDeleteTable.java
index f8129d9..08834f2 100644
--- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupDeleteTable.java
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupDeleteTable.java
@@ -27,8 +27,8 @@ import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Assert;
@@ -75,7 +75,7 @@ public class TestIncrementalBackupDeleteTable extends TestBackupBase {
assertTrue(checkSucceeded(backupIdFull));
// #2 - insert some data to table table1
- HTable t1 = (HTable) conn.getTable(table1);
+ Table t1 = conn.getTable(table1);
Put p1;
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
p1 = new Put(Bytes.toBytes("row-t1" + i));
@@ -110,11 +110,11 @@ public class TestIncrementalBackupDeleteTable extends TestBackupBase {
assertTrue(hAdmin.tableExists(table2_restore));
// #5.2 - checking row count of tables for full restore
- HTable hTable = (HTable) conn.getTable(table1_restore);
+ Table hTable = conn.getTable(table1_restore);
Assert.assertEquals(TEST_UTIL.countRows(hTable), NB_ROWS_IN_BATCH);
hTable.close();
- hTable = (HTable) conn.getTable(table2_restore);
+ hTable = conn.getTable(table2_restore);
Assert.assertEquals(TEST_UTIL.countRows(hTable), NB_ROWS_IN_BATCH);
hTable.close();
@@ -124,7 +124,7 @@ public class TestIncrementalBackupDeleteTable extends TestBackupBase {
client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backupIdIncMultiple,
false, tablesRestoreIncMultiple, tablesMapIncMultiple, true));
- hTable = (HTable) conn.getTable(table1_restore);
+ hTable = conn.getTable(table1_restore);
Assert.assertEquals(TEST_UTIL.countRows(hTable), NB_ROWS_IN_BATCH * 2);
hTable.close();
admin.close();
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupMergeWithFailures.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupMergeWithFailures.java
index 57bdc46..7351258 100644
--- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupMergeWithFailures.java
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupMergeWithFailures.java
@@ -39,7 +39,6 @@ import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Pair;
@@ -245,14 +244,14 @@ public class TestIncrementalBackupMergeWithFailures extends TestBackupBase {
assertTrue(checkSucceeded(backupIdFull));
// #2 - insert some data to table1
- HTable t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS);
+ Table t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS);
LOG.debug("writing " + ADD_ROWS + " rows to " + table1);
Assert.assertEquals(TEST_UTIL.countRows(t1), NB_ROWS_IN_BATCH + ADD_ROWS);
t1.close();
LOG.debug("written " + ADD_ROWS + " rows to " + table1);
- HTable t2 = insertIntoTable(conn, table2, famName, 1, ADD_ROWS);
+ Table t2 = insertIntoTable(conn, table2, famName, 1, ADD_ROWS);
Assert.assertEquals(TEST_UTIL.countRows(t2), NB_ROWS_IN_BATCH + ADD_ROWS);
t2.close();
@@ -334,7 +333,7 @@ public class TestIncrementalBackupMergeWithFailures extends TestBackupBase {
tablesRestoreIncMultiple, tablesMapIncMultiple, true));
Table hTable = conn.getTable(table1_restore);
- LOG.debug("After incremental restore: " + hTable.getTableDescriptor());
+ LOG.debug("After incremental restore: " + hTable.getDescriptor());
LOG.debug("f1 has " + TEST_UTIL.countRows(hTable, famName) + " rows");
Assert.assertEquals(TEST_UTIL.countRows(hTable, famName), NB_ROWS_IN_BATCH + 2 * ADD_ROWS);
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java
index 82f0fb7..4b02077 100644
--- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java
@@ -29,8 +29,8 @@ import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.tool.TestBulkLoadHFiles;
import org.apache.hadoop.hbase.util.Bytes;
@@ -79,7 +79,7 @@ public class TestIncrementalBackupWithBulkLoad extends TestBackupBase {
assertTrue(checkSucceeded(backupIdFull));
// #2 - insert some data to table table1
- HTable t1 = (HTable) conn.getTable(table1);
+ Table t1 = conn.getTable(table1);
Put p1;
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
p1 = new Put(Bytes.toBytes("row-t1" + i));
@@ -127,7 +127,7 @@ public class TestIncrementalBackupWithBulkLoad extends TestBackupBase {
client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backupIdIncMultiple1,
false, tablesRestoreIncMultiple, tablesRestoreIncMultiple, true));
- HTable hTable = (HTable) conn.getTable(table1);
+ Table hTable = conn.getTable(table1);
Assert.assertEquals(TEST_UTIL.countRows(hTable), NB_ROWS_IN_BATCH * 2 + actual + actual1);
request = createBackupRequest(BackupType.FULL, tables, BACKUP_ROOT_DIR);
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithFailures.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithFailures.java
index d5829b2..f6725d9 100644
--- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithFailures.java
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithFailures.java
@@ -35,8 +35,8 @@ import org.apache.hadoop.hbase.backup.impl.TableBackupClient.Stage;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.ToolRunner;
@@ -100,14 +100,14 @@ public class TestIncrementalBackupWithFailures extends TestBackupBase {
assertTrue(checkSucceeded(backupIdFull));
// #2 - insert some data to table
- HTable t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS);
+ Table t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS);
LOG.debug("writing " + ADD_ROWS + " rows to " + table1);
Assert.assertEquals(TEST_UTIL.countRows(t1), NB_ROWS_IN_BATCH + ADD_ROWS + NB_ROWS_FAM3);
t1.close();
LOG.debug("written " + ADD_ROWS + " rows to " + table1);
- HTable t2 = (HTable) conn.getTable(table2);
+ Table t2 = conn.getTable(table2);
Put p2;
for (int i = 0; i < 5; i++) {
p2 = new Put(Bytes.toBytes("row-t2" + i));
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestRemoteBackup.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestRemoteBackup.java
index a0226e6..05826e2 100644
--- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestRemoteBackup.java
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestRemoteBackup.java
@@ -29,8 +29,8 @@ import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.snapshot.MobSnapshotTestingUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
import org.apache.hadoop.hbase.testclassification.LargeTests;
@@ -78,7 +78,7 @@ public class TestRemoteBackup extends TestBackupBase {
} catch (InterruptedException ie) {
}
try {
- HTable t1 = (HTable) conn.getTable(table1);
+ Table t1 = conn.getTable(table1);
Put p1;
for (int i = 0; i < NB_ROWS_IN_FAM3; i++) {
p1 = new Put(Bytes.toBytes("row-t1" + i));
@@ -102,7 +102,7 @@ public class TestRemoteBackup extends TestBackupBase {
HBaseTestingUtility.modifyTableSync(TEST_UTIL.getAdmin(), table1Desc);
SnapshotTestingUtils.loadData(TEST_UTIL, table1, 50, fam2Name);
- HTable t1 = (HTable) conn.getTable(table1);
+ Table t1 = conn.getTable(table1);
int rows0 = MobSnapshotTestingUtils.countMobRows(t1, fam2Name);
latch.countDown();
@@ -130,7 +130,7 @@ public class TestRemoteBackup extends TestBackupBase {
assertTrue(hAdmin.tableExists(table1_restore));
// #5.2 - checking row count of tables for full restore
- HTable hTable = (HTable) conn.getTable(table1_restore);
+ Table hTable = conn.getTable(table1_restore);
Assert.assertEquals(TEST_UTIL.countRows(hTable, famName), NB_ROWS_IN_BATCH);
int cnt3 = TEST_UTIL.countRows(hTable, fam3Name);
Assert.assertTrue(cnt3 >= 0 && cnt3 <= NB_ROWS_IN_FAM3);
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/master/TestBackupLogCleaner.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/master/TestBackupLogCleaner.java
index 9273487..6b8011e 100644
--- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/master/TestBackupLogCleaner.java
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/master/TestBackupLogCleaner.java
@@ -38,8 +38,8 @@ import org.apache.hadoop.hbase.backup.TestBackupBase;
import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
@@ -107,7 +107,7 @@ public class TestBackupLogCleaner extends TestBackupBase {
assertTrue(walFiles.size() < newWalFiles.size());
Connection conn = ConnectionFactory.createConnection(conf1);
// #2 - insert some data to table
- HTable t1 = (HTable) conn.getTable(table1);
+ Table t1 = conn.getTable(table1);
Put p1;
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
p1 = new Put(Bytes.toBytes("row-t1" + i));
@@ -117,7 +117,7 @@ public class TestBackupLogCleaner extends TestBackupBase {
t1.close();
- HTable t2 = (HTable) conn.getTable(table2);
+ Table t2 = conn.getTable(table2);
Put p2;
for (int i = 0; i < 5; i++) {
p2 = new Put(Bytes.toBytes("row-t2" + i));
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
index 75971ad..0546520 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
@@ -202,6 +202,14 @@ public interface AsyncConnection extends Closeable {
boolean isClosed();
/**
+ * Convert this connection to a {@link Connection}.
+ *
+ * Usually we will return the same instance if you call this method multiple times so you can
+ * consider this as a light-weighted operation.
+ */
+ Connection toConnection();
+
+ /**
* Retrieve an Hbck implementation to fix an HBase cluster. The returned Hbck is not guaranteed to
* be thread-safe. A new instance should be created by each thread. This is a lightweight
* operation. Pooling or caching of the returned Hbck instance is not recommended.
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index 9bdfbe4..5133dc1 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -23,6 +23,7 @@ import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLE
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.net.SocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -104,6 +105,8 @@ class AsyncConnectionImpl implements AsyncConnection {
private volatile boolean closed = false;
+ private volatile ConnectionOverAsyncConnection conn;
+
public AsyncConnectionImpl(Configuration conf, AsyncRegistry registry, String clusterId,
SocketAddress localAddress, User user) {
this.conf = conf;
@@ -138,6 +141,11 @@ class AsyncConnectionImpl implements AsyncConnection {
}
@Override
+ public boolean isClosed() {
+ return closed;
+ }
+
+ @Override
public void close() {
// As the code below is safe to be executed in parallel, here we do not use CAS or lock, just a
// simple volatile flag.
@@ -149,17 +157,21 @@ class AsyncConnectionImpl implements AsyncConnection {
if (authService != null) {
authService.shutdown();
}
+ ConnectionOverAsyncConnection c = this.conn;
+ if (c != null) {
+ c.closeConnImpl();
+ }
closed = true;
}
@Override
- public boolean isClosed() {
- return closed;
+ public AsyncTableRegionLocator getRegionLocator(TableName tableName) {
+ return new AsyncTableRegionLocatorImpl(tableName, this);
}
@Override
- public AsyncTableRegionLocator getRegionLocator(TableName tableName) {
- return new AsyncTableRegionLocatorImpl(tableName, this);
+ public void clearRegionLocationCache() {
+ locator.clearCache();
}
// we will override this method for testing retry caller, so do not remove this method.
@@ -283,6 +295,30 @@ class AsyncConnectionImpl implements AsyncConnection {
}
@Override
+ public Connection toConnection() {
+ ConnectionOverAsyncConnection c = this.conn;
+ if (c != null) {
+ return c;
+ }
+ synchronized (this) {
+ c = this.conn;
+ if (c != null) {
+ return c;
+ }
+ try {
+ c = new ConnectionOverAsyncConnection(this,
+ ConnectionFactory.createConnectionImpl(conf, null, user));
+ } catch (IOException e) {
+ // TODO: finally we will not rely on ConnectionImplementation anymore and there will no
+ // IOException here.
+ throw new UncheckedIOException(e);
+ }
+ this.conn = c;
+ }
+ return c;
+ }
+
+ @Override
public CompletableFuture getHbck() {
CompletableFuture future = new CompletableFuture<>();
addListener(registry.getMasterAddress(), (sn, error) -> {
@@ -307,9 +343,4 @@ class AsyncConnectionImpl implements AsyncConnection {
return new HBaseHbck(MasterProtos.HbckService.newBlockingStub(
rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout)), rpcControllerFactory);
}
-
- @Override
- public void clearRegionLocationCache() {
- locator.clearCache();
- }
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java
index 4dde1bb..65326e9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java
@@ -74,7 +74,8 @@ final class AsyncRegionLocatorHelper {
RegionMovedException rme = (RegionMovedException) cause;
HRegionLocation newLoc =
new HRegionLocation(loc.getRegion(), rme.getServerName(), rme.getLocationSeqNum());
- LOG.debug("Try updating {} with the new location {} constructed by {}", loc, newLoc, rme);
+ LOG.debug("Try updating {} with the new location {} constructed by {}", loc, newLoc,
+ rme.toString());
addToCache.accept(newLoc);
} else {
LOG.debug("Try removing {} from cache", loc);
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java
index 9b97e93..cd5d5ad 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java
@@ -187,4 +187,8 @@ class AsyncTableResultScanner implements ResultScanner, AdvancedScanResultConsum
public ScanMetrics getScanMetrics() {
return scanMetrics;
}
+
+ int getCacheSize() {
+ return queue.size();
+ }
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java
index 90891f4..b88c40c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.yetus.audience.InterfaceAudience;
/**
@@ -124,7 +125,9 @@ public interface Connection extends Abortable, Closeable {
*
* @return a {@link BufferedMutator} for the supplied tableName.
*/
- BufferedMutator getBufferedMutator(TableName tableName) throws IOException;
+ default BufferedMutator getBufferedMutator(TableName tableName) throws IOException {
+ return getBufferedMutator(new BufferedMutatorParams(tableName));
+ }
/**
* Retrieve a {@link BufferedMutator} for performing client-side buffering of writes. The
@@ -194,6 +197,14 @@ public interface Connection extends Abortable, Closeable {
TableBuilder getTableBuilder(TableName tableName, ExecutorService pool);
/**
+ * Convert this connection to an {@link AsyncConnection}.
+ *
+ * Usually we will return the same instance if you call this method multiple times so you can
+ * consider this as a light-weighted operation.
+ */
+ AsyncConnection toAsyncConnection();
+
+ /**
* Retrieve an Hbck implementation to fix an HBase cluster.
* The returned Hbck is not guaranteed to be thread-safe. A new instance should be created by
* each thread. This is a lightweight operation. Pooling or caching of the returned Hbck instance
@@ -207,7 +218,7 @@ public interface Connection extends Abortable, Closeable {
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.HBCK)
default Hbck getHbck() throws IOException {
- throw new UnsupportedOperationException("Not implemented");
+ return FutureUtils.get(toAsyncConnection().getHbck());
}
/**
@@ -228,6 +239,6 @@ public interface Connection extends Abortable, Closeable {
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.HBCK)
default Hbck getHbck(ServerName masterServer) throws IOException {
- throw new UnsupportedOperationException("Not implemented");
+ return toAsyncConnection().getHbck(masterServer);
}
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java
index ceef356..b6d0161 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.AuthUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
@@ -211,29 +212,34 @@ public class ConnectionFactory {
* @return Connection object for conf
*/
public static Connection createConnection(Configuration conf, ExecutorService pool,
- final User user) throws IOException {
- String className = conf.get(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL,
- ConnectionImplementation.class.getName());
- Class> clazz;
- try {
- clazz = Class.forName(className);
- } catch (ClassNotFoundException e) {
- throw new IOException(e);
- }
- try {
- // Default HCM#HCI is not accessible; make it so before invoking.
- Constructor> constructor = clazz.getDeclaredConstructor(Configuration.class,
- ExecutorService.class, User.class);
- constructor.setAccessible(true);
- return user.runAs(
- (PrivilegedExceptionAction)() ->
- (Connection) constructor.newInstance(conf, pool, user));
- } catch (Exception e) {
- throw new IOException(e);
+ final User user) throws IOException {
+ Class> clazz = conf.getClass(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL,
+ ConnectionOverAsyncConnection.class, Connection.class);
+ if (clazz != ConnectionOverAsyncConnection.class) {
+ try {
+ // Default HCM#HCI is not accessible; make it so before invoking.
+ Constructor> constructor =
+ clazz.getDeclaredConstructor(Configuration.class, ExecutorService.class, User.class);
+ constructor.setAccessible(true);
+ return user.runAs((PrivilegedExceptionAction) () -> (Connection) constructor
+ .newInstance(conf, pool, user));
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ } else {
+ return FutureUtils.get(createAsyncConnection(conf, user)).toConnection();
}
}
/**
+ * Create a {@link ConnectionImplementation}, internal use only.
+ */
+ static ConnectionImplementation createConnectionImpl(Configuration conf, ExecutorService pool,
+ User user) throws IOException {
+ return new ConnectionImplementation(conf, pool, user);
+ }
+
+ /**
* Call {@link #createAsyncConnection(Configuration)} using default HBaseConfiguration.
* @see #createAsyncConnection(Configuration)
* @return AsyncConnection object wrapped by CompletableFuture
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
index 7461239..8d3e1f4 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
@@ -41,8 +41,6 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
@@ -76,7 +74,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ExceptionUtil;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ReflectionUtils;
-import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.yetus.audience.InterfaceAudience;
@@ -419,11 +416,6 @@ class ConnectionImplementation implements Connection, Closeable {
}
@Override
- public BufferedMutator getBufferedMutator(TableName tableName) {
- return getBufferedMutator(new BufferedMutatorParams(tableName));
- }
-
- @Override
public RegionLocator getRegionLocator(TableName tableName) throws IOException {
return new HRegionLocator(tableName, this);
}
@@ -478,30 +470,8 @@ class ConnectionImplementation implements Connection, Closeable {
private ExecutorService getThreadPool(int maxThreads, int coreThreads, String nameHint,
BlockingQueue passedWorkQueue) {
// shared HTable thread executor not yet initialized
- if (maxThreads == 0) {
- maxThreads = Runtime.getRuntime().availableProcessors() * 8;
- }
- if (coreThreads == 0) {
- coreThreads = Runtime.getRuntime().availableProcessors() * 8;
- }
- long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60);
- BlockingQueue workQueue = passedWorkQueue;
- if (workQueue == null) {
- workQueue =
- new LinkedBlockingQueue<>(maxThreads *
- conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
- HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
- coreThreads = maxThreads;
- }
- ThreadPoolExecutor tpe = new ThreadPoolExecutor(
- coreThreads,
- maxThreads,
- keepAliveTime,
- TimeUnit.SECONDS,
- workQueue,
- Threads.newDaemonThreadFactory(toString() + nameHint));
- tpe.allowCoreThreadTimeOut(true);
- return tpe;
+ return ConnectionUtils.getThreadPool(conf, maxThreads, coreThreads, () -> toString() + nameHint,
+ passedWorkQueue);
}
private ExecutorService getMetaLookupPool() {
@@ -533,21 +503,10 @@ class ConnectionImplementation implements Connection, Closeable {
private void shutdownPools() {
if (this.cleanupPool && this.batchPool != null && !this.batchPool.isShutdown()) {
- shutdownBatchPool(this.batchPool);
+ ConnectionUtils.shutdownPool(this.batchPool);
}
if (this.metaLookupPool != null && !this.metaLookupPool.isShutdown()) {
- shutdownBatchPool(this.metaLookupPool);
- }
- }
-
- private void shutdownBatchPool(ExecutorService pool) {
- pool.shutdown();
- try {
- if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
- pool.shutdownNow();
- }
- } catch (InterruptedException e) {
- pool.shutdownNow();
+ ConnectionUtils.shutdownPool(this.metaLookupPool);
}
}
@@ -2210,4 +2169,9 @@ class ConnectionImplementation implements Connection, Closeable {
throw new IOException(cause);
}
}
+
+ @Override
+ public AsyncConnection toAsyncConnection() {
+ throw new UnsupportedOperationException();
+ }
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java
new file mode 100644
index 0000000..61cc708
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.log.HBaseMarkers;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
+/**
+ * The connection implementation based on {@link AsyncConnection}.
+ */
+@InterfaceAudience.Private
+class ConnectionOverAsyncConnection implements Connection {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ConnectionOverAsyncConnection.class);
+
+ private volatile boolean aborted = false;
+
+ private volatile ExecutorService batchPool = null;
+
+ protected final AsyncConnectionImpl conn;
+
+ /**
+ * @deprecated we can not implement all the related stuffs at once so keep it here for now, will
+ * remove it after we implement all the stuffs, like Admin, RegionLocator, etc.
+ */
+ @Deprecated
+ private final ConnectionImplementation oldConn;
+
+ private final ConnectionConfiguration connConf;
+
+ ConnectionOverAsyncConnection(AsyncConnectionImpl conn, ConnectionImplementation oldConn) {
+ this.conn = conn;
+ this.oldConn = oldConn;
+ this.connConf = new ConnectionConfiguration(conn.getConfiguration());
+ }
+
+ @Override
+ public void abort(String why, Throwable error) {
+ if (error != null) {
+ LOG.error(HBaseMarkers.FATAL, why, error);
+ } else {
+ LOG.error(HBaseMarkers.FATAL, why);
+ }
+ aborted = true;
+ try {
+ Closeables.close(this, true);
+ } catch (IOException e) {
+ throw new AssertionError(e);
+ }
+ }
+
+ @Override
+ public boolean isAborted() {
+ return aborted;
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return conn.getConfiguration();
+ }
+
+ @Override
+ public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException {
+ return oldConn.getBufferedMutator(params);
+ }
+
+ @Override
+ public RegionLocator getRegionLocator(TableName tableName) throws IOException {
+ return oldConn.getRegionLocator(tableName);
+ }
+
+ @Override
+ public void clearRegionLocationCache() {
+ conn.clearRegionLocationCache();
+ }
+
+ @Override
+ public Admin getAdmin() throws IOException {
+ return oldConn.getAdmin();
+ }
+
+ @Override
+ public void close() throws IOException {
+ conn.close();
+ }
+
+ // will be called from AsyncConnection, to avoid infinite loop as in the above method we will call
+ // AsyncConnection.close.
+ void closeConnImpl() {
+ ExecutorService batchPool = this.batchPool;
+ if (batchPool != null) {
+ ConnectionUtils.shutdownPool(batchPool);
+ this.batchPool = null;
+ }
+ }
+
+ @Override
+ public boolean isClosed() {
+ return conn.isClosed();
+ }
+
+ private ExecutorService getBatchPool() {
+ if (batchPool == null) {
+ synchronized (this) {
+ if (batchPool == null) {
+ int threads = conn.getConfiguration().getInt("hbase.hconnection.threads.max", 256);
+ this.batchPool = ConnectionUtils.getThreadPool(conn.getConfiguration(), threads, threads,
+ () -> toString() + "-shared", null);
+ }
+ }
+ }
+ return this.batchPool;
+ }
+
+ @Override
+ public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) {
+ return new TableBuilderBase(tableName, connConf) {
+
+ @Override
+ public Table build() {
+ ExecutorService p = pool != null ? pool : getBatchPool();
+ return new TableOverAsyncTable(conn,
+ conn.getTableBuilder(tableName).setRpcTimeout(rpcTimeout, TimeUnit.MILLISECONDS)
+ .setReadRpcTimeout(readRpcTimeout, TimeUnit.MILLISECONDS)
+ .setWriteRpcTimeout(writeRpcTimeout, TimeUnit.MILLISECONDS)
+ .setOperationTimeout(operationTimeout, TimeUnit.MILLISECONDS).build(),
+ p);
+ }
+ };
+ }
+
+ @Override
+ public AsyncConnection toAsyncConnection() {
+ return conn;
+ }
+
+ @Override
+ public Hbck getHbck() throws IOException {
+ return FutureUtils.get(conn.getHbck());
+ }
+
+ @Override
+ public Hbck getHbck(ServerName masterServer) throws IOException {
+ return conn.getHbck(masterServer);
+ }
+
+ /**
+ * An identifier that will remain the same for a given connection.
+ */
+ @Override
+ public String toString() {
+ return "connection-over-async-connection-0x" + Integer.toHexString(hashCode());
+ }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index 2af9fa4..46d0fa2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -28,9 +28,12 @@ import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
@@ -48,9 +51,9 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ReflectionUtils;
+import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.DNS;
import org.apache.yetus.audience.InterfaceAudience;
@@ -64,10 +67,8 @@ import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hbase.thirdparty.io.netty.util.Timer;
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
/**
* Utility used by client connections.
@@ -138,68 +139,6 @@ public final class ConnectionUtils {
}
/**
- * A ClusterConnection that will short-circuit RPC making direct invocations against the localhost
- * if the invocation target is 'this' server; save on network and protobuf invocations.
- */
- // TODO This has to still do PB marshalling/unmarshalling stuff. Check how/whether we can avoid.
- @VisibleForTesting // Class is visible so can assert we are short-circuiting when expected.
- public static class ShortCircuitingClusterConnection extends ConnectionImplementation {
- private final ServerName serverName;
- private final AdminService.BlockingInterface localHostAdmin;
- private final ClientService.BlockingInterface localHostClient;
-
- private ShortCircuitingClusterConnection(Configuration conf, ExecutorService pool, User user,
- ServerName serverName, AdminService.BlockingInterface admin,
- ClientService.BlockingInterface client) throws IOException {
- super(conf, pool, user);
- this.serverName = serverName;
- this.localHostAdmin = admin;
- this.localHostClient = client;
- }
-
- @Override
- public AdminService.BlockingInterface getAdmin(ServerName sn) throws IOException {
- return serverName.equals(sn) ? this.localHostAdmin : super.getAdmin(sn);
- }
-
- @Override
- public ClientService.BlockingInterface getClient(ServerName sn) throws IOException {
- return serverName.equals(sn) ? this.localHostClient : super.getClient(sn);
- }
-
- @Override
- public MasterKeepAliveConnection getMaster() throws IOException {
- if (this.localHostClient instanceof MasterService.BlockingInterface) {
- return new ShortCircuitMasterConnection(
- (MasterService.BlockingInterface) this.localHostClient);
- }
- return super.getMaster();
- }
- }
-
- /**
- * Creates a short-circuit connection that can bypass the RPC layer (serialization,
- * deserialization, networking, etc..) when talking to a local server.
- * @param conf the current configuration
- * @param pool the thread pool to use for batch operations
- * @param user the user the connection is for
- * @param serverName the local server name
- * @param admin the admin interface of the local server
- * @param client the client interface of the local server
- * @return an short-circuit connection.
- * @throws IOException if IO failure occurred
- */
- public static ConnectionImplementation createShortCircuitConnection(final Configuration conf,
- ExecutorService pool, User user, final ServerName serverName,
- final AdminService.BlockingInterface admin, final ClientService.BlockingInterface client)
- throws IOException {
- if (user == null) {
- user = UserProvider.instantiate(conf).getCurrent();
- }
- return new ShortCircuitingClusterConnection(conf, pool, user, serverName, admin, client);
- }
-
- /**
* Setup the connection class, so that it will not depend on master being online. Used for testing
* @param conf configuration to set
*/
@@ -711,4 +650,38 @@ public final class ConnectionUtils {
}
return future;
}
+
+ static ExecutorService getThreadPool(Configuration conf, int maxThreads, int coreThreads,
+ Supplier threadName, BlockingQueue passedWorkQueue) {
+ // shared HTable thread executor not yet initialized
+ if (maxThreads == 0) {
+ maxThreads = Runtime.getRuntime().availableProcessors() * 8;
+ }
+ if (coreThreads == 0) {
+ coreThreads = Runtime.getRuntime().availableProcessors() * 8;
+ }
+ long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60);
+ BlockingQueue workQueue = passedWorkQueue;
+ if (workQueue == null) {
+ workQueue =
+ new LinkedBlockingQueue<>(maxThreads * conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
+ HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
+ coreThreads = maxThreads;
+ }
+ ThreadPoolExecutor tpe = new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTime,
+ TimeUnit.SECONDS, workQueue, Threads.newDaemonThreadFactory(threadName.get()));
+ tpe.allowCoreThreadTimeOut(true);
+ return tpe;
+ }
+
+ static void shutdownPool(ExecutorService pool) {
+ pool.shutdown();
+ try {
+ if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+ pool.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ pool.shutdownNow();
+ }
+ }
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
index 21e3b53..536e664 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
@@ -85,6 +85,7 @@ import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
+import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.access.GetUserPermissionsRequest;
import org.apache.hadoop.hbase.security.access.Permission;
import org.apache.hadoop.hbase.security.access.ShadedAccessControlUtil;
@@ -2004,8 +2005,8 @@ public class HBaseAdmin implements Admin {
// Check ZK first.
// If the connection exists, we may have a connection to ZK that does not work anymore
- try (ConnectionImplementation connection =
- (ConnectionImplementation) ConnectionFactory.createConnection(copyOfConf)) {
+ try (ConnectionImplementation connection = ConnectionFactory.createConnectionImpl(copyOfConf,
+ null, UserProvider.instantiate(copyOfConf).getCurrent())) {
// can throw MasterNotRunningException
connection.isMasterRunning();
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index b43f070..6ec6df6 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -49,7 +49,6 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
import org.apache.hadoop.hbase.filter.BinaryComparator;
-import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
@@ -100,7 +99,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType
*/
@InterfaceAudience.Private
@InterfaceStability.Stable
-public class HTable implements Table {
+class HTable implements Table {
private static final Logger LOG = LoggerFactory.getLogger(HTable.class);
private static final Consistency DEFAULT_CONSISTENCY = Consistency.STRONG;
private final ConnectionImplementation connection;
@@ -655,29 +654,6 @@ public class HTable implements Table {
callWithRetries(callable, this.operationTimeoutMs);
}
- @Override
- @Deprecated
- public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier,
- final byte [] value, final Put put) throws IOException {
- return doCheckAndPut(row, family, qualifier, CompareOperator.EQUAL.name(), value, null, put);
- }
-
- @Override
- @Deprecated
- public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier,
- final CompareOp compareOp, final byte [] value, final Put put) throws IOException {
- return doCheckAndPut(row, family, qualifier, compareOp.name(), value, null, put);
- }
-
- @Override
- @Deprecated
- public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier,
- final CompareOperator op, final byte [] value, final Put put) throws IOException {
- // The name of the operators in CompareOperator are intentionally those of the
- // operators in the filter's CompareOp enum.
- return doCheckAndPut(row, family, qualifier, op.name(), value, null, put);
- }
-
private boolean doCheckAndPut(final byte[] row, final byte[] family, final byte[] qualifier,
final String opName, final byte[] value, final TimeRange timeRange, final Put put)
throws IOException {
@@ -698,28 +674,6 @@ public class HTable implements Table {
.callWithRetries(callable, this.operationTimeoutMs);
}
- @Override
- @Deprecated
- public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
- final byte[] value, final Delete delete) throws IOException {
- return doCheckAndDelete(row, family, qualifier, CompareOperator.EQUAL.name(), value, null,
- delete);
- }
-
- @Override
- @Deprecated
- public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
- final CompareOp compareOp, final byte[] value, final Delete delete) throws IOException {
- return doCheckAndDelete(row, family, qualifier, compareOp.name(), value, null, delete);
- }
-
- @Override
- @Deprecated
- public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
- final CompareOperator op, final byte[] value, final Delete delete) throws IOException {
- return doCheckAndDelete(row, family, qualifier, op.name(), value, null, delete);
- }
-
private boolean doCheckAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
final String opName, final byte[] value, final TimeRange timeRange, final Delete delete)
throws IOException {
@@ -812,21 +766,6 @@ public class HTable implements Table {
}
@Override
- @Deprecated
- public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
- final CompareOp compareOp, final byte [] value, final RowMutations rm)
- throws IOException {
- return doCheckAndMutate(row, family, qualifier, compareOp.name(), value, null, rm);
- }
-
- @Override
- @Deprecated
- public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
- final CompareOperator op, final byte [] value, final RowMutations rm) throws IOException {
- return doCheckAndMutate(row, family, qualifier, op.name(), value, null, rm);
- }
-
- @Override
public boolean exists(final Get get) throws IOException {
Result r = get(get, true);
assert r.getExists() != null;
@@ -928,23 +867,6 @@ public class HTable implements Table {
}
@Override
- public Map coprocessorService(final Class service,
- byte[] startKey, byte[] endKey, final Batch.Call callable)
- throws ServiceException, Throwable {
- final Map results = Collections.synchronizedMap(
- new TreeMap(Bytes.BYTES_COMPARATOR));
- coprocessorService(service, startKey, endKey, callable, new Batch.Callback() {
- @Override
- public void update(byte[] region, byte[] row, R value) {
- if (region != null) {
- results.put(region, value);
- }
- }
- });
- return results;
- }
-
- @Override
public void coprocessorService(final Class service,
byte[] startKey, byte[] endKey, final Batch.Call callable,
final Batch.Callback callback) throws ServiceException, Throwable {
@@ -1000,93 +922,26 @@ public class HTable implements Table {
}
@Override
- @Deprecated
- public int getRpcTimeout() {
- return rpcTimeoutMs;
- }
-
- @Override
- @Deprecated
- public void setRpcTimeout(int rpcTimeout) {
- setReadRpcTimeout(rpcTimeout);
- setWriteRpcTimeout(rpcTimeout);
- }
-
- @Override
public long getReadRpcTimeout(TimeUnit unit) {
return unit.convert(readRpcTimeoutMs, TimeUnit.MILLISECONDS);
}
@Override
- @Deprecated
- public int getReadRpcTimeout() {
- return readRpcTimeoutMs;
- }
-
- @Override
- @Deprecated
- public void setReadRpcTimeout(int readRpcTimeout) {
- this.readRpcTimeoutMs = readRpcTimeout;
- }
-
- @Override
public long getWriteRpcTimeout(TimeUnit unit) {
return unit.convert(writeRpcTimeoutMs, TimeUnit.MILLISECONDS);
}
@Override
- @Deprecated
- public int getWriteRpcTimeout() {
- return writeRpcTimeoutMs;
- }
-
- @Override
- @Deprecated
- public void setWriteRpcTimeout(int writeRpcTimeout) {
- this.writeRpcTimeoutMs = writeRpcTimeout;
- }
-
- @Override
public long getOperationTimeout(TimeUnit unit) {
return unit.convert(operationTimeoutMs, TimeUnit.MILLISECONDS);
}
@Override
- @Deprecated
- public int getOperationTimeout() {
- return operationTimeoutMs;
- }
-
- @Override
- @Deprecated
- public void setOperationTimeout(int operationTimeout) {
- this.operationTimeoutMs = operationTimeout;
- }
-
- @Override
public String toString() {
return tableName + ";" + connection;
}
@Override
- public Map batchCoprocessorService(
- Descriptors.MethodDescriptor methodDescriptor, Message request,
- byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
- final Map results = Collections.synchronizedMap(new TreeMap(
- Bytes.BYTES_COMPARATOR));
- batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype,
- new Callback() {
- @Override
- public void update(byte[] region, byte[] row, R result) {
- if (region != null) {
- results.put(region, result);
- }
- }
- });
- return results;
- }
-
- @Override
public void batchCoprocessorService(
final Descriptors.MethodDescriptor methodDescriptor, final Message request,
byte[] startKey, byte[] endKey, final R responsePrototype, final Callback callback)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannelImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannelImpl.java
index 94e7d9a..3c25c57 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannelImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannelImpl.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
@@ -57,6 +58,8 @@ class RegionCoprocessorRpcChannelImpl implements RpcChannel {
private final long operationTimeoutNs;
+ private byte[] lastRegion;
+
RegionCoprocessorRpcChannelImpl(AsyncConnectionImpl conn, TableName tableName, RegionInfo region,
byte[] row, long rpcTimeoutNs, long operationTimeoutNs) {
this.conn = conn;
@@ -71,15 +74,13 @@ class RegionCoprocessorRpcChannelImpl implements RpcChannel {
Message responsePrototype, HBaseRpcController controller, HRegionLocation loc,
ClientService.Interface stub) {
CompletableFuture future = new CompletableFuture<>();
- if (region != null
- && !Bytes.equals(loc.getRegionInfo().getRegionName(), region.getRegionName())) {
- future.completeExceptionally(new DoNotRetryIOException(
- "Region name is changed, expected " + region.getRegionNameAsString() + ", actual "
- + loc.getRegionInfo().getRegionNameAsString()));
+ if (region != null && !Bytes.equals(loc.getRegion().getRegionName(), region.getRegionName())) {
+ future.completeExceptionally(new DoNotRetryIOException("Region name is changed, expected " +
+ region.getRegionNameAsString() + ", actual " + loc.getRegion().getRegionNameAsString()));
return future;
}
CoprocessorServiceRequest csr = CoprocessorRpcUtils.getCoprocessorServiceRequest(method,
- request, row, loc.getRegionInfo().getRegionName());
+ request, row, loc.getRegion().getRegionName());
stub.execService(controller, csr,
new org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback() {
@@ -88,6 +89,7 @@ class RegionCoprocessorRpcChannelImpl implements RpcChannel {
if (controller.failed()) {
future.completeExceptionally(controller.getFailed());
} else {
+ lastRegion = resp.getRegion().getValue().toByteArray();
try {
future.complete(CoprocessorRpcUtils.getResponse(resp, responsePrototype));
} catch (IOException e) {
@@ -99,6 +101,23 @@ class RegionCoprocessorRpcChannelImpl implements RpcChannel {
return future;
}
+ protected final void setError(RpcController controller, Throwable error) {
+ if (controller == null) {
+ return;
+ }
+ if (controller instanceof ServerRpcController) {
+ if (error instanceof IOException) {
+ ((ServerRpcController) controller).setFailedOn((IOException) error);
+ } else {
+ ((ServerRpcController) controller).setFailedOn(new IOException(error));
+ }
+ } else if (controller instanceof ClientCoprocessorRpcController) {
+ ((ClientCoprocessorRpcController) controller).setFailed(error);
+ } else {
+ controller.setFailed(error.toString());
+ }
+ }
+
@Override
public void callMethod(MethodDescriptor method, RpcController controller, Message request,
Message responsePrototype, RpcCallback done) {
@@ -109,9 +128,13 @@ class RegionCoprocessorRpcChannelImpl implements RpcChannel {
.action((c, l, s) -> rpcCall(method, request, responsePrototype, c, l, s)).call(),
(r, e) -> {
if (e != null) {
- ((ClientCoprocessorRpcController) controller).setFailed(e);
+ setError(controller, e);
}
done.run(r);
});
}
+
+ public byte[] getLastRegion() {
+ return lastRegion;
+ }
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
index 23bb5ce..f73c3e4 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
@@ -1113,6 +1113,11 @@ public class Scan extends Query {
return asyncPrefetch;
}
+ /**
+ * @deprecated Since 3.0.0, will be removed in 4.0.0. After building sync client upon async
+ * client, the implementation is always 'async prefetch', so this flag is useless now.
+ */
+ @Deprecated
public Scan setAsyncPrefetch(boolean asyncPrefetch) {
this.asyncPrefetch = asyncPrefetch;
return this;
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
index 6908424..f6519a2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
@@ -18,30 +18,28 @@
*/
package org.apache.hadoop.hbase.client;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import com.google.protobuf.Service;
+import com.google.protobuf.ServiceException;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
-
import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CompareOperator;
-import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.io.TimeRange;
-import org.apache.yetus.audience.InterfaceAudience;
-
import org.apache.hadoop.hbase.client.coprocessor.Batch;
-import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
+import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.util.Bytes;
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.Message;
-import com.google.protobuf.Service;
-import com.google.protobuf.ServiceException;
+import org.apache.yetus.audience.InterfaceAudience;
/**
* Used to communicate with a single HBase table.
@@ -70,23 +68,6 @@ public interface Table extends Closeable {
Configuration getConfiguration();
/**
- * Gets the {@link org.apache.hadoop.hbase.HTableDescriptor table descriptor} for this table.
- * @throws java.io.IOException if a remote or network exception occurs.
- * @deprecated since 2.0 version and will be removed in 3.0 version.
- * use {@link #getDescriptor()}
- */
- @Deprecated
- default HTableDescriptor getTableDescriptor() throws IOException {
- TableDescriptor descriptor = getDescriptor();
-
- if (descriptor instanceof HTableDescriptor) {
- return (HTableDescriptor)descriptor;
- } else {
- return new HTableDescriptor(descriptor);
- }
- }
-
- /**
* Gets the {@link org.apache.hadoop.hbase.client.TableDescriptor table descriptor} for this table.
* @throws java.io.IOException if a remote or network exception occurs.
*/
@@ -130,24 +111,6 @@ public interface Table extends Closeable {
}
/**
- * Test for the existence of columns in the table, as specified by the Gets.
- * This will return an array of booleans. Each value will be true if the related Get matches
- * one or more keys, false if not.
- * This is a server-side call so it prevents any data from being transferred to
- * the client.
- *
- * @param gets the Gets
- * @return Array of boolean. True if the specified Get matches one or more keys, false if not.
- * @throws IOException e
- * @deprecated since 2.0 version and will be removed in 3.0 version.
- * use {@link #exists(List)}
- */
- @Deprecated
- default boolean[] existsAll(List gets) throws IOException {
- return exists(gets);
- }
-
- /**
* Method that does a batch call on Deletes, Gets, Puts, Increments, Appends, RowMutations.
* The ordering of execution of the actions is not defined. Meaning if you do a Put and a
* Get in the same {@link #batch} call, you will not necessarily be
@@ -169,10 +132,15 @@ public interface Table extends Closeable {
/**
* Same as {@link #batch(List, Object[])}, but with a callback.
* @since 0.96.0
+ * @deprecated since 3.0.0, will removed in 4.0.0. Please use the batch related methods in
+ * {@link AsyncTable} directly if you want to use callback. We reuse the callback for
+ * coprocessor here, and the problem is that for batch operation, the
+ * {@link AsyncTable} does not tell us the region, so in this method we need an extra
+ * locating after we get the result, which is not good.
*/
- default void batchCallback(
- final List extends Row> actions, final Object[] results, final Batch.Callback callback)
- throws IOException, InterruptedException {
+ @Deprecated
+ default void batchCallback(final List extends Row> actions, final Object[] results,
+ final Batch.Callback callback) throws IOException, InterruptedException {
throw new NotImplementedException("Add an implementation!");
}
@@ -283,84 +251,6 @@ public interface Table extends Closeable {
}
/**
- * Atomically checks if a row/family/qualifier value matches the expected
- * value. If it does, it adds the put. If the passed value is null, the check
- * is for the lack of column (ie: non-existance)
- *
- * @param row to check
- * @param family column family to check
- * @param qualifier column qualifier to check
- * @param value the expected value
- * @param put data to put if check succeeds
- * @throws IOException e
- * @return true if the new put was executed, false otherwise
- * @deprecated Since 2.0.0. Will be removed in 3.0.0. Use {@link #checkAndMutate(byte[], byte[])}
- */
- @Deprecated
- default boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, byte[] value, Put put)
- throws IOException {
- return checkAndPut(row, family, qualifier, CompareOperator.EQUAL, value, put);
- }
-
- /**
- * Atomically checks if a row/family/qualifier value matches the expected
- * value. If it does, it adds the put. If the passed value is null, the check
- * is for the lack of column (ie: non-existence)
- *
- * The expected value argument of this call is on the left and the current
- * value of the cell is on the right side of the comparison operator.
- *
- * Ie. eg. GREATER operator means expected value > existing <=> add the put.
- *
- * @param row to check
- * @param family column family to check
- * @param qualifier column qualifier to check
- * @param compareOp comparison operator to use
- * @param value the expected value
- * @param put data to put if check succeeds
- * @throws IOException e
- * @return true if the new put was executed, false otherwise
- * @deprecated Since 2.0.0. Will be removed in 3.0.0. Use {@link #checkAndMutate(byte[], byte[])}
- */
- @Deprecated
- default boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
- CompareFilter.CompareOp compareOp, byte[] value, Put put) throws IOException {
- RowMutations mutations = new RowMutations(put.getRow(), 1);
- mutations.add(put);
-
- return checkAndMutate(row, family, qualifier, compareOp, value, mutations);
- }
-
- /**
- * Atomically checks if a row/family/qualifier value matches the expected
- * value. If it does, it adds the put. If the passed value is null, the check
- * is for the lack of column (ie: non-existence)
- *
- * The expected value argument of this call is on the left and the current
- * value of the cell is on the right side of the comparison operator.
- *
- * Ie. eg. GREATER operator means expected value > existing <=> add the put.
- *
- * @param row to check
- * @param family column family to check
- * @param qualifier column qualifier to check
- * @param op comparison operator to use
- * @param value the expected value
- * @param put data to put if check succeeds
- * @throws IOException e
- * @return true if the new put was executed, false otherwise
- * @deprecated Since 2.0.0. Will be removed in 3.0.0. Use {@link #checkAndMutate(byte[], byte[])}
- */
- @Deprecated
- default boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, CompareOperator op,
- byte[] value, Put put) throws IOException {
- RowMutations mutations = new RowMutations(put.getRow(), 1);
- mutations.add(put);
-
- return checkAndMutate(row, family, qualifier, op, value, mutations);
- }
-
- /**
* Deletes the specified cells/row.
*
* @param delete The object that specifies what to delete.
@@ -399,84 +289,6 @@ public interface Table extends Closeable {
}
/**
- * Atomically checks if a row/family/qualifier value matches the expected
- * value. If it does, it adds the delete. If the passed value is null, the
- * check is for the lack of column (ie: non-existance)
- *
- * @param row to check
- * @param family column family to check
- * @param qualifier column qualifier to check
- * @param value the expected value
- * @param delete data to delete if check succeeds
- * @throws IOException e
- * @return true if the new delete was executed, false otherwise
- * @deprecated Since 2.0.0. Will be removed in 3.0.0. Use {@link #checkAndMutate(byte[], byte[])}
- */
- @Deprecated
- default boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
- byte[] value, Delete delete) throws IOException {
- return checkAndDelete(row, family, qualifier, CompareOperator.EQUAL, value, delete);
- }
-
- /**
- * Atomically checks if a row/family/qualifier value matches the expected
- * value. If it does, it adds the delete. If the passed value is null, the
- * check is for the lack of column (ie: non-existence)
- *
- * The expected value argument of this call is on the left and the current
- * value of the cell is on the right side of the comparison operator.
- *
- * Ie. eg. GREATER operator means expected value > existing <=> add the delete.
- *
- * @param row to check
- * @param family column family to check
- * @param qualifier column qualifier to check
- * @param compareOp comparison operator to use
- * @param value the expected value
- * @param delete data to delete if check succeeds
- * @throws IOException e
- * @return true if the new delete was executed, false otherwise
- * @deprecated Since 2.0.0. Will be removed in 3.0.0. Use {@link #checkAndMutate(byte[], byte[])}
- */
- @Deprecated
- default boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
- CompareFilter.CompareOp compareOp, byte[] value, Delete delete) throws IOException {
- RowMutations mutations = new RowMutations(delete.getRow(), 1);
- mutations.add(delete);
-
- return checkAndMutate(row, family, qualifier, compareOp, value, mutations);
- }
-
- /**
- * Atomically checks if a row/family/qualifier value matches the expected
- * value. If it does, it adds the delete. If the passed value is null, the
- * check is for the lack of column (ie: non-existence)
- *
- * The expected value argument of this call is on the left and the current
- * value of the cell is on the right side of the comparison operator.
- *
- * Ie. eg. GREATER operator means expected value > existing <=> add the delete.
- *
- * @param row to check
- * @param family column family to check
- * @param qualifier column qualifier to check
- * @param op comparison operator to use
- * @param value the expected value
- * @param delete data to delete if check succeeds
- * @throws IOException e
- * @return true if the new delete was executed, false otherwise
- * @deprecated Since 2.0.0. Will be removed in 3.0.0. Use {@link #checkAndMutate(byte[], byte[])}
- */
- @Deprecated
- default boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
- CompareOperator op, byte[] value, Delete delete) throws IOException {
- RowMutations mutations = new RowMutations(delete.getRow(), 1);
- mutations.add(delete);
-
- return checkAndMutate(row, family, qualifier, op, value, mutations);
- }
-
- /**
* Atomically checks if a row/family/qualifier value matches the expected value. If it does, it
* adds the Put/Delete/RowMutations.
*
@@ -643,32 +455,35 @@ public interface Table extends Closeable {
}
/**
- * Creates and returns a {@link com.google.protobuf.RpcChannel} instance connected to the
- * table region containing the specified row. The row given does not actually have
- * to exist. Whichever region would contain the row based on start and end keys will
- * be used. Note that the {@code row} parameter is also not passed to the
- * coprocessor handler registered for this protocol, unless the {@code row}
- * is separately passed as an argument in the service request. The parameter
- * here is only used to locate the region used to handle the call.
- *
+ * Creates and returns a {@link com.google.protobuf.RpcChannel} instance connected to the table
+ * region containing the specified row. The row given does not actually have to exist. Whichever
+ * region would contain the row based on start and end keys will be used. Note that the
+ * {@code row} parameter is also not passed to the coprocessor handler registered for this
+ * protocol, unless the {@code row} is separately passed as an argument in the service request.
+ * The parameter here is only used to locate the region used to handle the call.
*
* The obtained {@link com.google.protobuf.RpcChannel} instance can be used to access a published
* coprocessor {@link com.google.protobuf.Service} using standard protobuf service invocations:
*
* @param row The row key used to identify the remote region location
* @return A CoprocessorRpcChannel instance
+ * @deprecated since 3.0.0, will removed in 4.0.0. This is too low level, please stop using it any
+ * more. Use the coprocessorService methods in {@link AsyncTable} instead.
+ * @see Connection#toAsyncConnection()
*/
+ @Deprecated
default CoprocessorRpcChannel coprocessorService(byte[] row) {
throw new NotImplementedException("Add an implementation!");
}
@@ -678,25 +493,41 @@ public interface Table extends Closeable {
* region spanning the range from the {@code startKey} row to {@code endKey} row (inclusive), and
* invokes the passed {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call} method
* with each {@link com.google.protobuf.Service} instance.
- *
* @param service the protocol buffer {@code Service} implementation to call
- * @param startKey start region selection with region containing this row. If {@code null}, the
- * selection will start with the first table region.
+ * @param startKey start region selection with region containing this row. If {@code null}, the
+ * selection will start with the first table region.
* @param endKey select regions up to and including the region containing this row. If
- * {@code null}, selection will continue through the last table region.
+ * {@code null}, selection will continue through the last table region.
* @param callable this instance's
- * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call}
- * method will be invoked once per table region, using the {@link com.google.protobuf.Service}
- * instance connected to that region.
+ * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call} method will be
+ * invoked once per table region, using the {@link com.google.protobuf.Service} instance
+ * connected to that region.
* @param the {@link com.google.protobuf.Service} subclass to connect to
- * @param Return type for the {@code callable} parameter's {@link
- * org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call} method
+ * @param Return type for the {@code callable} parameter's
+ * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call} method
* @return a map of result values keyed by region name
+ * @deprecated since 3.0.0, will removed in 4.0.0. The batch call here references the blocking
+ * interface for of a protobuf stub, so it is not possible to do it in an asynchronous
+ * way, even if now we are building the {@link Table} implementation based on the
+ * {@link AsyncTable}, which is not good. Use the coprocessorService methods in
+ * {@link AsyncTable} directly instead.
+ * @see Connection#toAsyncConnection()
*/
- default Map coprocessorService(final Class service,
- byte[] startKey, byte[] endKey, final Batch.Call callable)
- throws ServiceException, Throwable {
- throw new NotImplementedException("Add an implementation!");
+ @Deprecated
+ default Map coprocessorService(final Class service,
+ byte[] startKey, byte[] endKey, final Batch.Call callable)
+ throws ServiceException, Throwable {
+ Map results =
+ Collections.synchronizedMap(new TreeMap(Bytes.BYTES_COMPARATOR));
+ coprocessorService(service, startKey, endKey, callable, new Batch.Callback() {
+ @Override
+ public void update(byte[] region, byte[] row, R value) {
+ if (region != null) {
+ results.put(region, value);
+ }
+ }
+ });
+ return results;
}
/**
@@ -704,28 +535,35 @@ public interface Table extends Closeable {
* region spanning the range from the {@code startKey} row to {@code endKey} row (inclusive), and
* invokes the passed {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call} method
* with each {@link Service} instance.
- *
- *
The given
+ *
+ * The given
* {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Callback#update(byte[],byte[],Object)}
* method will be called with the return value from each region's
- * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call} invocation.
* @param service the protocol buffer {@code Service} implementation to call
- * @param startKey start region selection with region containing this row. If {@code null}, the
- * selection will start with the first table region.
+ * @param startKey start region selection with region containing this row. If {@code null}, the
+ * selection will start with the first table region.
* @param endKey select regions up to and including the region containing this row. If
- * {@code null}, selection will continue through the last table region.
+ * {@code null}, selection will continue through the last table region.
* @param callable this instance's
- * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call}
- * method will be invoked once per table region, using the {@link Service} instance connected to
- * that region.
+ * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call} method will be
+ * invoked once per table region, using the {@link Service} instance connected to that
+ * region.
* @param the {@link Service} subclass to connect to
- * @param Return type for the {@code callable} parameter's {@link
- * org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call} method
+ * @param Return type for the {@code callable} parameter's
+ * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call} method
+ * @deprecated since 3.0.0, will removed in 4.0.0. The batch call here references the blocking
+ * interface for of a protobuf stub, so it is not possible to do it in an asynchronous
+ * way, even if now we are building the {@link Table} implementation based on the
+ * {@link AsyncTable}, which is not good. Use the coprocessorService methods in
+ * {@link AsyncTable} directly instead.
+ * @see Connection#toAsyncConnection()
*/
- default void coprocessorService(final Class service,
- byte[] startKey, byte[] endKey, final Batch.Call callable,
- final Batch.Callback callback) throws ServiceException, Throwable {
+ @Deprecated
+ default void coprocessorService(final Class service, byte[] startKey,
+ byte[] endKey, final Batch.Call callable, final Batch.Callback callback)
+ throws ServiceException, Throwable {
throw new NotImplementedException("Add an implementation!");
}
@@ -734,27 +572,38 @@ public interface Table extends Closeable {
* region spanning the range from the {@code startKey} row to {@code endKey} row (inclusive), all
* the invocations to the same region server will be batched into one call. The coprocessor
* service is invoked according to the service instance, method name and parameters.
- *
- * @param methodDescriptor
- * the descriptor for the protobuf service method to call.
- * @param request
- * the method call parameters
- * @param startKey
- * start region selection with region containing this row. If {@code null}, the
+ * @param methodDescriptor the descriptor for the protobuf service method to call.
+ * @param request the method call parameters
+ * @param startKey start region selection with region containing this row. If {@code null}, the
* selection will start with the first table region.
- * @param endKey
- * select regions up to and including the region containing this row. If {@code null},
- * selection will continue through the last table region.
- * @param responsePrototype
- * the proto type of the response of the method in Service.
- * @param
- * the response type for the coprocessor Service method
+ * @param endKey select regions up to and including the region containing this row. If
+ * {@code null}, selection will continue through the last table region.
+ * @param responsePrototype the proto type of the response of the method in Service.
+ * @param the response type for the coprocessor Service method
* @return a map of result values keyed by region name
+ * @deprecated since 3.0.0, will removed in 4.0.0. The batch call here references the blocking
+ * interface for of a protobuf stub, so it is not possible to do it in an asynchronous
+ * way, even if now we are building the {@link Table} implementation based on the
+ * {@link AsyncTable}, which is not good. Use the coprocessorService methods in
+ * {@link AsyncTable} directly instead.
+ * @see Connection#toAsyncConnection()
*/
+ @Deprecated
default Map batchCoprocessorService(
- Descriptors.MethodDescriptor methodDescriptor, Message request,
- byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
- throw new NotImplementedException("Add an implementation!");
+ Descriptors.MethodDescriptor methodDescriptor, Message request, byte[] startKey,
+ byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
+ final Map results =
+ Collections.synchronizedMap(new TreeMap(Bytes.BYTES_COMPARATOR));
+ batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype,
+ new Callback() {
+ @Override
+ public void update(byte[] region, byte[] row, R result) {
+ if (region != null) {
+ results.put(region, result);
+ }
+ }
+ });
+ return results;
}
/**
@@ -762,24 +611,28 @@ public interface Table extends Closeable {
* region spanning the range from the {@code startKey} row to {@code endKey} row (inclusive), all
* the invocations to the same region server will be batched into one call. The coprocessor
* service is invoked according to the service instance, method name and parameters.
- *
*
* The given
* {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Callback#update(byte[],byte[],Object)}
* method will be called with the return value from each region's invocation.
*
- *
* @param methodDescriptor the descriptor for the protobuf service method to call.
* @param request the method call parameters
- * @param startKey start region selection with region containing this row.
- * If {@code null}, the selection will start with the first table region.
- * @param endKey select regions up to and including the region containing this row.
- * If {@code null}, selection will continue through the last table region.
+ * @param startKey start region selection with region containing this row. If {@code null}, the
+ * selection will start with the first table region.
+ * @param endKey select regions up to and including the region containing this row. If
+ * {@code null}, selection will continue through the last table region.
* @param responsePrototype the proto type of the response of the method in Service.
* @param callback callback to invoke with the response for each region
- * @param
- * the response type for the coprocessor Service method
+ * @param the response type for the coprocessor Service method
+ * @deprecated since 3.0.0, will removed in 4.0.0. The batch call here references the blocking
+ * interface for of a protobuf stub, so it is not possible to do it in an asynchronous
+ * way, even if now we are building the {@link Table} implementation based on the
+ * {@link AsyncTable}, which is not good. Use the coprocessorService methods in
+ * {@link AsyncTable} directly instead.
+ * @see Connection#toAsyncConnection()
*/
+ @Deprecated
default void batchCoprocessorService(
Descriptors.MethodDescriptor methodDescriptor, Message request, byte[] startKey,
byte[] endKey, R responsePrototype, Batch.Callback callback)
@@ -788,58 +641,6 @@ public interface Table extends Closeable {
}
/**
- * Atomically checks if a row/family/qualifier value matches the expected value.
- * If it does, it performs the row mutations. If the passed value is null, the check
- * is for the lack of column (ie: non-existence)
- *
- * The expected value argument of this call is on the left and the current
- * value of the cell is on the right side of the comparison operator.
- *
- * Ie. eg. GREATER operator means expected value > existing <=> perform row mutations.
- *
- * @param row to check
- * @param family column family to check
- * @param qualifier column qualifier to check
- * @param compareOp the comparison operator
- * @param value the expected value
- * @param mutation mutations to perform if check succeeds
- * @throws IOException e
- * @return true if the new put was executed, false otherwise
- * @deprecated Since 2.0.0. Will be removed in 3.0.0. Use {@link #checkAndMutate(byte[], byte[])}
- */
- @Deprecated
- default boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
- CompareFilter.CompareOp compareOp, byte[] value, RowMutations mutation) throws IOException {
- throw new NotImplementedException("Add an implementation!");
- }
-
- /**
- * Atomically checks if a row/family/qualifier value matches the expected value.
- * If it does, it performs the row mutations. If the passed value is null, the check
- * is for the lack of column (ie: non-existence)
- *
- * The expected value argument of this call is on the left and the current
- * value of the cell is on the right side of the comparison operator.
- *
- * Ie. eg. GREATER operator means expected value > existing <=> perform row mutations.
- *
- * @param row to check
- * @param family column family to check
- * @param qualifier column qualifier to check
- * @param op the comparison operator
- * @param value the expected value
- * @param mutation mutations to perform if check succeeds
- * @throws IOException e
- * @return true if the new put was executed, false otherwise
- * @deprecated Since 2.0.0. Will be removed in 3.0.0. Use {@link #checkAndMutate(byte[], byte[])}
- */
- @Deprecated
- default boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op,
- byte[] value, RowMutations mutation) throws IOException {
- throw new NotImplementedException("Add an implementation!");
- }
-
- /**
* Get timeout of each rpc request in this Table instance. It will be overridden by a more
* specific rpc timeout config such as readRpcTimeout or writeRpcTimeout.
* @see #getReadRpcTimeout(TimeUnit)
@@ -852,36 +653,6 @@ public interface Table extends Closeable {
}
/**
- * Get timeout (millisecond) of each rpc request in this Table instance.
- *
- * @return Currently configured read timeout
- * @deprecated use {@link #getReadRpcTimeout(TimeUnit)} or
- * {@link #getWriteRpcTimeout(TimeUnit)} instead
- */
- @Deprecated
- default int getRpcTimeout() {
- return (int)getRpcTimeout(TimeUnit.MILLISECONDS);
- }
-
- /**
- * Set timeout (millisecond) of each rpc request in operations of this Table instance, will
- * override the value of hbase.rpc.timeout in configuration.
- * If a rpc request waiting too long, it will stop waiting and send a new request to retry until
- * retries exhausted or operation timeout reached.
- *
- * NOTE: This will set both the read and write timeout settings to the provided value.
- *
- * @param rpcTimeout the timeout of each rpc request in millisecond.
- *
- * @deprecated Use setReadRpcTimeout or setWriteRpcTimeout instead
- */
- @Deprecated
- default void setRpcTimeout(int rpcTimeout) {
- setReadRpcTimeout(rpcTimeout);
- setWriteRpcTimeout(rpcTimeout);
- }
-
- /**
* Get timeout of each rpc read request in this Table instance.
* @param unit the unit of time the timeout to be represented in
* @return read rpc timeout in the specified time unit
@@ -891,30 +662,6 @@ public interface Table extends Closeable {
}
/**
- * Get timeout (millisecond) of each rpc read request in this Table instance.
- * @deprecated since 2.0 and will be removed in 3.0 version
- * use {@link #getReadRpcTimeout(TimeUnit)} instead
- */
- @Deprecated
- default int getReadRpcTimeout() {
- return (int)getReadRpcTimeout(TimeUnit.MILLISECONDS);
- }
-
- /**
- * Set timeout (millisecond) of each rpc read request in operations of this Table instance, will
- * override the value of hbase.rpc.read.timeout in configuration.
- * If a rpc read request waiting too long, it will stop waiting and send a new request to retry
- * until retries exhausted or operation timeout reached.
- *
- * @param readRpcTimeout the timeout for read rpc request in milliseconds
- * @deprecated since 2.0.0, use {@link TableBuilder#setReadRpcTimeout} instead
- */
- @Deprecated
- default void setReadRpcTimeout(int readRpcTimeout) {
- throw new NotImplementedException("Add an implementation!");
- }
-
- /**
* Get timeout of each rpc write request in this Table instance.
* @param unit the unit of time the timeout to be represented in
* @return write rpc timeout in the specified time unit
@@ -924,30 +671,6 @@ public interface Table extends Closeable {
}
/**
- * Get timeout (millisecond) of each rpc write request in this Table instance.
- * @deprecated since 2.0 and will be removed in 3.0 version
- * use {@link #getWriteRpcTimeout(TimeUnit)} instead
- */
- @Deprecated
- default int getWriteRpcTimeout() {
- return (int)getWriteRpcTimeout(TimeUnit.MILLISECONDS);
- }
-
- /**
- * Set timeout (millisecond) of each rpc write request in operations of this Table instance, will
- * override the value of hbase.rpc.write.timeout in configuration.
- * If a rpc write request waiting too long, it will stop waiting and send a new request to retry
- * until retries exhausted or operation timeout reached.
- *
- * @param writeRpcTimeout the timeout for write rpc request in milliseconds
- * @deprecated since 2.0.0, use {@link TableBuilder#setWriteRpcTimeout} instead
- */
- @Deprecated
- default void setWriteRpcTimeout(int writeRpcTimeout) {
- throw new NotImplementedException("Add an implementation!");
- }
-
- /**
* Get timeout of each operation in Table instance.
* @param unit the unit of time the timeout to be represented in
* @return operation rpc timeout in the specified time unit
@@ -955,30 +678,4 @@ public interface Table extends Closeable {
default long getOperationTimeout(TimeUnit unit) {
throw new NotImplementedException("Add an implementation!");
}
-
- /**
- * Get timeout (millisecond) of each operation for in Table instance.
- * @deprecated since 2.0 and will be removed in 3.0 version
- * use {@link #getOperationTimeout(TimeUnit)} instead
- */
- @Deprecated
- default int getOperationTimeout() {
- return (int)getOperationTimeout(TimeUnit.MILLISECONDS);
- }
-
- /**
- * Set timeout (millisecond) of each operation in this Table instance, will override the value
- * of hbase.client.operation.timeout in configuration.
- * Operation timeout is a top-level restriction that makes sure a blocking method will not be
- * blocked more than this. In each operation, if rpc request fails because of timeout or
- * other reason, it will retry until success or throw a RetriesExhaustedException. But if the
- * total time being blocking reach the operation timeout before retries exhausted, it will break
- * early and throw SocketTimeoutException.
- * @param operationTimeout the total timeout of each operation in millisecond.
- * @deprecated since 2.0.0, use {@link TableBuilder#setOperationTimeout} instead
- */
- @Deprecated
- default void setOperationTimeout(int operationTimeout) {
- throw new NotImplementedException("Add an implementation!");
- }
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java
new file mode 100644
index 0000000..7146212
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java
@@ -0,0 +1,527 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+import com.google.protobuf.ServiceException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CompareOperator;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext;
+import org.apache.hadoop.hbase.client.coprocessor.Batch.Call;
+import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
+import org.apache.hadoop.hbase.io.TimeRange;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.primitives.Booleans;
+
+/**
+ * The table implementation based on {@link AsyncTable}.
+ */
+@InterfaceAudience.Private
+class TableOverAsyncTable implements Table {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TableOverAsyncTable.class);
+
+ private final AsyncConnectionImpl conn;
+
+ private final AsyncTable> table;
+
+ private final ExecutorService pool;
+
+ TableOverAsyncTable(AsyncConnectionImpl conn, AsyncTable> table, ExecutorService pool) {
+ this.conn = conn;
+ this.table = table;
+ this.pool = pool;
+ }
+
+ @Override
+ public TableName getName() {
+ return table.getName();
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return table.getConfiguration();
+ }
+
+ @Override
+ public TableDescriptor getDescriptor() throws IOException {
+ return FutureUtils.get(conn.getAdmin().getDescriptor(getName()));
+ }
+
+ @Override
+ public boolean exists(Get get) throws IOException {
+ return FutureUtils.get(table.exists(get));
+ }
+
+ @Override
+ public boolean[] exists(List gets) throws IOException {
+ return Booleans.toArray(FutureUtils.get(table.existsAll(gets)));
+ }
+
+ @Override
+ public void batch(List extends Row> actions, Object[] results) throws IOException {
+ if (ArrayUtils.isEmpty(results)) {
+ FutureUtils.get(table.batchAll(actions));
+ return;
+ }
+ List errors = new ArrayList<>();
+ List> futures = table.batch(actions);
+ for (int i = 0, n = results.length; i < n; i++) {
+ try {
+ results[i] = FutureUtils.get(futures.get(i));
+ } catch (IOException e) {
+ results[i] = e;
+ errors.add(new ThrowableWithExtraContext(e, EnvironmentEdgeManager.currentTime(),
+ "Error when processing " + actions.get(i)));
+ }
+ }
+ if (!errors.isEmpty()) {
+ throw new RetriesExhaustedException(errors.size(), errors);
+ }
+ }
+
+ @Override
+ public void batchCallback(List extends Row> actions, Object[] results, Callback callback)
+ throws IOException, InterruptedException {
+ ConcurrentLinkedQueue errors = new ConcurrentLinkedQueue<>();
+ CountDownLatch latch = new CountDownLatch(actions.size());
+ AsyncTableRegionLocator locator = conn.getRegionLocator(getName());
+ List> futures = table. batch(actions);
+ for (int i = 0, n = futures.size(); i < n; i++) {
+ final int index = i;
+ FutureUtils.addListener(futures.get(i), (r, e) -> {
+ if (e != null) {
+ errors.add(new ThrowableWithExtraContext(e, EnvironmentEdgeManager.currentTime(),
+ "Error when processing " + actions.get(index)));
+ if (!ArrayUtils.isEmpty(results)) {
+ results[index] = e;
+ }
+ latch.countDown();
+ } else {
+ if (!ArrayUtils.isEmpty(results)) {
+ results[index] = r;
+ }
+ FutureUtils.addListener(locator.getRegionLocation(actions.get(index).getRow()),
+ (l, le) -> {
+ if (le != null) {
+ errors.add(new ThrowableWithExtraContext(le, EnvironmentEdgeManager.currentTime(),
+ "Error when finding the region for row " +
+ Bytes.toStringBinary(actions.get(index).getRow())));
+ } else {
+ callback.update(l.getRegion().getRegionName(), actions.get(index).getRow(), r);
+ }
+ latch.countDown();
+ });
+ }
+ });
+ }
+ latch.await();
+ if (!errors.isEmpty()) {
+ throw new RetriesExhaustedException(errors.size(),
+ errors.stream().collect(Collectors.toList()));
+ }
+ }
+
+ @Override
+ public Result get(Get get) throws IOException {
+ return FutureUtils.get(table.get(get));
+ }
+
+ @Override
+ public Result[] get(List gets) throws IOException {
+ return FutureUtils.get(table.getAll(gets)).toArray(new Result[0]);
+ }
+
+ @Override
+ public ResultScanner getScanner(Scan scan) throws IOException {
+ return table.getScanner(scan);
+ }
+
+ @Override
+ public ResultScanner getScanner(byte[] family) throws IOException {
+ return table.getScanner(family);
+ }
+
+ @Override
+ public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException {
+ return table.getScanner(family, qualifier);
+ }
+
+ @Override
+ public void put(Put put) throws IOException {
+ FutureUtils.get(table.put(put));
+ }
+
+ @Override
+ public void put(List puts) throws IOException {
+ FutureUtils.get(table.putAll(puts));
+ }
+
+ @Override
+ public void delete(Delete delete) throws IOException {
+ FutureUtils.get(table.delete(delete));
+ }
+
+ @Override
+ public void delete(List deletes) throws IOException {
+ FutureUtils.get(table.deleteAll(deletes));
+ }
+
+ private static final class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder {
+
+ private final AsyncTable.CheckAndMutateBuilder builder;
+
+ public CheckAndMutateBuilderImpl(
+ org.apache.hadoop.hbase.client.AsyncTable.CheckAndMutateBuilder builder) {
+ this.builder = builder;
+ }
+
+ @Override
+ public CheckAndMutateBuilder qualifier(byte[] qualifier) {
+ builder.qualifier(qualifier);
+ return this;
+ }
+
+ @Override
+ public CheckAndMutateBuilder timeRange(TimeRange timeRange) {
+ builder.timeRange(timeRange);
+ return this;
+ }
+
+ @Override
+ public CheckAndMutateBuilder ifNotExists() {
+ builder.ifNotExists();
+ return this;
+ }
+
+ @Override
+ public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) {
+ builder.ifMatches(compareOp, value);
+ return this;
+ }
+
+ @Override
+ public boolean thenPut(Put put) throws IOException {
+ return FutureUtils.get(builder.thenPut(put));
+ }
+
+ @Override
+ public boolean thenDelete(Delete delete) throws IOException {
+ return FutureUtils.get(builder.thenDelete(delete));
+ }
+
+ @Override
+ public boolean thenMutate(RowMutations mutation) throws IOException {
+ return FutureUtils.get(builder.thenMutate(mutation));
+ }
+ }
+
+ @Override
+ public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
+ return new CheckAndMutateBuilderImpl(table.checkAndMutate(row, family));
+ }
+
+ @Override
+ public void mutateRow(RowMutations rm) throws IOException {
+ FutureUtils.get(table.mutateRow(rm));
+ }
+
+ @Override
+ public Result append(Append append) throws IOException {
+ return FutureUtils.get(table.append(append));
+ }
+
+ @Override
+ public Result increment(Increment increment) throws IOException {
+ return FutureUtils.get(table.increment(increment));
+ }
+
+ @Override
+ public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount)
+ throws IOException {
+ return FutureUtils.get(table.incrementColumnValue(row, family, qualifier, amount));
+ }
+
+ @Override
+ public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount,
+ Durability durability) throws IOException {
+ return FutureUtils.get(table.incrementColumnValue(row, family, qualifier, amount, durability));
+ }
+
+ @Override
+ public void close() {
+ }
+
+ private static final class BlockingRpcCallback implements RpcCallback {
+ private R result;
+ private boolean resultSet = false;
+
+ /**
+ * Called on completion of the RPC call with the response object, or {@code null} in the case of
+ * an error.
+ * @param parameter the response object or {@code null} if an error occurred
+ */
+ @Override
+ public void run(R parameter) {
+ synchronized (this) {
+ result = parameter;
+ resultSet = true;
+ this.notifyAll();
+ }
+ }
+
+ /**
+ * Returns the parameter passed to {@link #run(Object)} or {@code null} if a null value was
+ * passed. When used asynchronously, this method will block until the {@link #run(Object)}
+ * method has been called.
+ * @return the response object or {@code null} if no response was passed
+ */
+ public synchronized R get() throws IOException {
+ while (!resultSet) {
+ try {
+ this.wait();
+ } catch (InterruptedException ie) {
+ InterruptedIOException exception = new InterruptedIOException(ie.getMessage());
+ exception.initCause(ie);
+ throw exception;
+ }
+ }
+ return result;
+ }
+ }
+
+ private static final class RegionCoprocessorRpcChannel extends RegionCoprocessorRpcChannelImpl
+ implements CoprocessorRpcChannel {
+
+ RegionCoprocessorRpcChannel(AsyncConnectionImpl conn, TableName tableName, RegionInfo region,
+ byte[] row, long rpcTimeoutNs, long operationTimeoutNs) {
+ super(conn, tableName, region, row, rpcTimeoutNs, operationTimeoutNs);
+ }
+
+ @Override
+ public void callMethod(MethodDescriptor method, RpcController controller, Message request,
+ Message responsePrototype, RpcCallback done) {
+ ClientCoprocessorRpcController c = new ClientCoprocessorRpcController();
+ BlockingRpcCallback callback = new BlockingRpcCallback<>();
+ super.callMethod(method, c, request, responsePrototype, callback);
+ Message ret;
+ try {
+ ret = callback.get();
+ } catch (IOException e) {
+ setError(controller, e);
+ return;
+ }
+ if (c.failed()) {
+ setError(controller, c.getFailed());
+ }
+ done.run(ret);
+ }
+
+ @Override
+ public Message callBlockingMethod(MethodDescriptor method, RpcController controller,
+ Message request, Message responsePrototype) throws ServiceException {
+ ClientCoprocessorRpcController c = new ClientCoprocessorRpcController();
+ BlockingRpcCallback done = new BlockingRpcCallback<>();
+ callMethod(method, c, request, responsePrototype, done);
+ Message ret;
+ try {
+ ret = done.get();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ if (c.failed()) {
+ setError(controller, c.getFailed());
+ throw new ServiceException(c.getFailed());
+ }
+ return ret;
+ }
+ }
+
+ @Override
+ public RegionCoprocessorRpcChannel coprocessorService(byte[] row) {
+ return new RegionCoprocessorRpcChannel(conn, getName(), null, row,
+ getRpcTimeout(TimeUnit.NANOSECONDS), getOperationTimeout(TimeUnit.NANOSECONDS));
+ }
+
+ /**
+ * Get the corresponding start keys and regions for an arbitrary range of keys.
+ *
+ * @param startKey Starting row in range, inclusive
+ * @param endKey Ending row in range
+ * @param includeEndKey true if endRow is inclusive, false if exclusive
+ * @return A pair of list of start keys and list of HRegionLocations that contain the specified
+ * range
+ * @throws IOException if a remote or network exception occurs
+ */
+ private Pair, List> getKeysAndRegionsInRange(final byte[] startKey,
+ final byte[] endKey, final boolean includeEndKey) throws IOException {
+ return getKeysAndRegionsInRange(startKey, endKey, includeEndKey, false);
+ }
+
+ /**
+ * Get the corresponding start keys and regions for an arbitrary range of keys.
+ *
+ * @param startKey Starting row in range, inclusive
+ * @param endKey Ending row in range
+ * @param includeEndKey true if endRow is inclusive, false if exclusive
+ * @param reload true to reload information or false to use cached information
+ * @return A pair of list of start keys and list of HRegionLocations that contain the specified
+ * range
+ * @throws IOException if a remote or network exception occurs
+ */
+ private Pair, List> getKeysAndRegionsInRange(final byte[] startKey,
+ final byte[] endKey, final boolean includeEndKey, final boolean reload) throws IOException {
+ final boolean endKeyIsEndOfTable = Bytes.equals(endKey, HConstants.EMPTY_END_ROW);
+ if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
+ throw new IllegalArgumentException(
+ "Invalid range: " + Bytes.toStringBinary(startKey) + " > " + Bytes.toStringBinary(endKey));
+ }
+ List keysInRange = new ArrayList<>();
+ List regionsInRange = new ArrayList<>();
+ byte[] currentKey = startKey;
+ do {
+ HRegionLocation regionLocation =
+ FutureUtils.get(conn.getRegionLocator(getName()).getRegionLocation(currentKey, reload));
+ keysInRange.add(currentKey);
+ regionsInRange.add(regionLocation);
+ currentKey = regionLocation.getRegion().getEndKey();
+ } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW) &&
+ (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0 ||
+ (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0)));
+ return new Pair<>(keysInRange, regionsInRange);
+ }
+
+ private List getStartKeysInRange(byte[] start, byte[] end) throws IOException {
+ if (start == null) {
+ start = HConstants.EMPTY_START_ROW;
+ }
+ if (end == null) {
+ end = HConstants.EMPTY_END_ROW;
+ }
+ return getKeysAndRegionsInRange(start, end, true).getFirst();
+ }
+
+ @FunctionalInterface
+ private interface StubCall {
+ R call(RegionCoprocessorRpcChannel channel) throws Exception;
+ }
+
+ private void coprocssorService(String serviceName, byte[] startKey, byte[] endKey,
+ Callback callback, StubCall call) throws Throwable {
+ // get regions covered by the row range
+ List keys = getStartKeysInRange(startKey, endKey);
+ Map> futures = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ for (byte[] r : keys) {
+ RegionCoprocessorRpcChannel channel = coprocessorService(r);
+ Future future = pool.submit(new Callable() {
+ @Override
+ public R call() throws Exception {
+ R result = call.call(channel);
+ byte[] region = channel.getLastRegion();
+ if (callback != null) {
+ callback.update(region, r, result);
+ }
+ return result;
+ }
+ });
+ futures.put(r, future);
+ }
+ for (Map.Entry> e : futures.entrySet()) {
+ try {
+ e.getValue().get();
+ } catch (ExecutionException ee) {
+ LOG.warn("Error calling coprocessor service " + serviceName + " for row " +
+ Bytes.toStringBinary(e.getKey()), ee);
+ throw ee.getCause();
+ } catch (InterruptedException ie) {
+ throw new InterruptedIOException("Interrupted calling coprocessor service " + serviceName +
+ " for row " + Bytes.toStringBinary(e.getKey())).initCause(ie);
+ }
+ }
+ }
+
+ @Override
+ public void coprocessorService(Class service, byte[] startKey,
+ byte[] endKey, Call callable, Callback callback) throws ServiceException, Throwable {
+ coprocssorService(service.getName(), startKey, endKey, callback, channel -> {
+ T instance = org.apache.hadoop.hbase.protobuf.ProtobufUtil.newServiceStub(service, channel);
+ return callable.call(instance);
+ });
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void batchCoprocessorService(MethodDescriptor methodDescriptor,
+ Message request, byte[] startKey, byte[] endKey, R responsePrototype, Callback callback)
+ throws ServiceException, Throwable {
+ coprocssorService(methodDescriptor.getFullName(), startKey, endKey, callback, channel -> {
+ return (R) channel.callBlockingMethod(methodDescriptor, null, request, responsePrototype);
+ });
+ }
+
+ @Override
+ public long getRpcTimeout(TimeUnit unit) {
+ return table.getRpcTimeout(unit);
+ }
+
+ @Override
+ public long getReadRpcTimeout(TimeUnit unit) {
+ return table.getReadRpcTimeout(unit);
+ }
+
+ @Override
+ public long getWriteRpcTimeout(TimeUnit unit) {
+ return table.getWriteRpcTimeout(unit);
+ }
+
+ @Override
+ public long getOperationTimeout(TimeUnit unit) {
+ return table.getOperationTimeout(unit);
+ }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CoprocessorRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CoprocessorRpcChannel.java
index 6ae7027..43d135b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CoprocessorRpcChannel.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CoprocessorRpcChannel.java
@@ -22,12 +22,18 @@ import com.google.protobuf.RpcChannel;
import org.apache.yetus.audience.InterfaceAudience;
/**
- * Base interface which provides clients with an RPC connection to
- * call coprocessor endpoint {@link com.google.protobuf.Service}s.
+ * Base interface which provides clients with an RPC connection to call coprocessor endpoint
+ * {@link com.google.protobuf.Service}s.
+ *
* Note that clients should not use this class directly, except through
* {@link org.apache.hadoop.hbase.client.Table#coprocessorService(byte[])}.
+ *
+ * @deprecated Please stop using this class again, as it is too low level, which is part of the rpc
+ * framework for HBase. Will be deleted in 4.0.0.
*/
+@Deprecated
@InterfaceAudience.Public
-public interface CoprocessorRpcChannel extends RpcChannel, BlockingRpcChannel {}
+public interface CoprocessorRpcChannel extends RpcChannel, BlockingRpcChannel {
+}
// This Interface is part of our public, client-facing API!!!
// This belongs in client package but it is exposed in our public API so we cannot relocate.
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/SimpleRegistry.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/SimpleRegistry.java
new file mode 100644
index 0000000..4d4d620
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/SimpleRegistry.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.util.FutureUtils;
+
+/**
+ * Simple cluster registry inserted in place of our usual zookeeper based one.
+ */
+class SimpleRegistry extends DoNothingAsyncRegistry {
+
+ private final ServerName metaHost;
+
+ volatile boolean closed = false;
+
+ private static final String META_HOST_CONFIG_NAME = "hbase.client.simple-registry.meta.host";
+
+ private static final String DEFAULT_META_HOST = "meta.example.org.16010,12345";
+
+ public static void setMetaHost(Configuration conf, ServerName metaHost) {
+ conf.set(META_HOST_CONFIG_NAME, metaHost.getServerName());
+ }
+
+ public SimpleRegistry(Configuration conf) {
+ super(conf);
+ this.metaHost = ServerName.valueOf(conf.get(META_HOST_CONFIG_NAME, DEFAULT_META_HOST));
+ }
+
+ @Override
+ public CompletableFuture getMetaRegionLocation() {
+ if (closed) {
+ return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed"));
+ } else {
+ return CompletableFuture.completedFuture(new RegionLocations(
+ new HRegionLocation(RegionInfoBuilder.FIRST_META_REGIONINFO, metaHost)));
+ }
+ }
+
+ @Override
+ public CompletableFuture getClusterId() {
+ if (closed) {
+ return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed"));
+ } else {
+ return CompletableFuture.completedFuture(HConstants.CLUSTER_ID_DEFAULT);
+ }
+ }
+
+ @Override
+ public CompletableFuture getCurrentNrHRS() {
+ if (closed) {
+ return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed"));
+ } else {
+ return CompletableFuture.completedFuture(1);
+ }
+ }
+
+ @Override
+ public void close() {
+ closed = true;
+ }
+}
\ No newline at end of file
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index d4781d1..0899fa1 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -1351,7 +1351,7 @@ public class TestAsyncProcess {
ap.previousTimeout = -1;
try {
- ht.existsAll(gets);
+ ht.exists(gets);
} catch (ClassCastException e) {
// No result response on this test.
}
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator.java
index 647ea32..96bb846 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.ClassRule;
@@ -33,12 +34,12 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
-@Category({SmallTests.class, ClientTests.class})
+@Category({ SmallTests.class, ClientTests.class })
public class TestBufferedMutator {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestBufferedMutator.class);
+ HBaseClassTestRule.forClass(TestBufferedMutator.class);
@Rule
public TestName name = new TestName();
@@ -55,10 +56,12 @@ public class TestBufferedMutator {
@Test
public void testAlternateBufferedMutatorImpl() throws IOException {
- BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(name.getMethodName()));
+ BufferedMutatorParams params =
+ new BufferedMutatorParams(TableName.valueOf(name.getMethodName()));
Configuration conf = HBaseConfiguration.create();
conf.set(AsyncRegistryFactory.REGISTRY_IMPL_CONF_KEY, DoNothingAsyncRegistry.class.getName());
- try (Connection connection = ConnectionFactory.createConnection(conf)) {
+ try (ConnectionImplementation connection = ConnectionFactory.createConnectionImpl(conf, null,
+ UserProvider.instantiate(conf).getCurrent())) {
BufferedMutator bm = connection.getBufferedMutator(params);
// Assert we get default BM if nothing specified.
assertTrue(bm instanceof BufferedMutatorImpl);
@@ -70,7 +73,8 @@ public class TestBufferedMutator {
// Now try creating a Connection after setting an alterate BufferedMutator into
// the configuration and confirm we get what was expected.
conf.set(BufferedMutator.CLASSNAME_KEY, MyBufferedMutator.class.getName());
- try (Connection connection = ConnectionFactory.createConnection(conf)) {
+ try (Connection connection = ConnectionFactory.createConnectionImpl(conf, null,
+ UserProvider.instantiate(conf).getCurrent())) {
BufferedMutator bm = connection.getBufferedMutator(params);
assertTrue(bm instanceof MyBufferedMutator);
}
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java
index 3cab09d..fd3a4f8 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java
@@ -28,7 +28,6 @@ import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.SortedMap;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -43,10 +42,8 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MetaTableAccessor;
-import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.RegionTooBusyException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
@@ -118,37 +115,11 @@ public class TestClientNoCluster extends Configured implements Tool {
@Before
public void setUp() throws Exception {
this.conf = HBaseConfiguration.create();
- // Run my Connection overrides. Use my little ConnectionImplementation below which
+ // Run my Connection overrides. Use my little ConnectionImplementation below which
// allows me insert mocks and also use my Registry below rather than the default zk based
// one so tests run faster and don't have zk dependency.
this.conf.set("hbase.client.registry.impl", SimpleRegistry.class.getName());
- }
-
- /**
- * Simple cluster registry inserted in place of our usual zookeeper based one.
- */
- static class SimpleRegistry extends DoNothingAsyncRegistry {
- final ServerName META_HOST = META_SERVERNAME;
-
- public SimpleRegistry(Configuration conf) {
- super(conf);
- }
-
- @Override
- public CompletableFuture getMetaRegionLocation() {
- return CompletableFuture.completedFuture(new RegionLocations(
- new HRegionLocation(RegionInfoBuilder.FIRST_META_REGIONINFO, META_HOST)));
- }
-
- @Override
- public CompletableFuture getClusterId() {
- return CompletableFuture.completedFuture(HConstants.CLUSTER_ID_DEFAULT);
- }
-
- @Override
- public CompletableFuture getCurrentNrHRS() {
- return CompletableFuture.completedFuture(1);
- }
+ SimpleRegistry.setMetaHost(conf, META_SERVERNAME);
}
/**
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
index 51c4553..6a9a8cf 100644
--- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
@@ -75,6 +75,7 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
@@ -521,7 +522,7 @@ public class TestHFileOutputFormat2 {
RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
setupMockStartKeys(regionLocator);
setupMockTableName(regionLocator);
- HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
+ HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
assertEquals(job.getNumReduceTasks(), 4);
}
@@ -631,7 +632,7 @@ public class TestHFileOutputFormat2 {
assertEquals("Should make " + regionNum + " regions", numRegions, regionNum);
allTables.put(tableStrSingle, table);
- tableInfo.add(new HFileOutputFormat2.TableInfo(table.getTableDescriptor(), r));
+ tableInfo.add(new HFileOutputFormat2.TableInfo(table.getDescriptor(), r));
}
Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
// Generate the bulk load files
@@ -817,7 +818,7 @@ public class TestHFileOutputFormat2 {
conf.set(HFileOutputFormat2.COMPRESSION_FAMILIES_CONF_KEY,
HFileOutputFormat2.serializeColumnFamilyAttribute
(HFileOutputFormat2.compressionDetails,
- Arrays.asList(table.getTableDescriptor())));
+ Arrays.asList(table.getDescriptor())));
// read back family specific compression setting from the configuration
Map retrievedFamilyToCompressionMap = HFileOutputFormat2
@@ -843,7 +844,7 @@ public class TestHFileOutputFormat2 {
.setBlockCacheEnabled(false)
.setTimeToLive(0));
}
- Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
+ Mockito.doReturn(mockTableDescriptor).when(table).getDescriptor();
}
/**
@@ -889,7 +890,7 @@ public class TestHFileOutputFormat2 {
familyToBloomType);
conf.set(HFileOutputFormat2.BLOOM_TYPE_FAMILIES_CONF_KEY,
HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.bloomTypeDetails,
- Arrays.asList(table.getTableDescriptor())));
+ Arrays.asList(table.getDescriptor())));
// read back family specific data block encoding settings from the
// configuration
@@ -917,7 +918,7 @@ public class TestHFileOutputFormat2 {
.setBlockCacheEnabled(false)
.setTimeToLive(0));
}
- Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
+ Mockito.doReturn(mockTableDescriptor).when(table).getDescriptor();
}
/**
@@ -961,7 +962,7 @@ public class TestHFileOutputFormat2 {
conf.set(HFileOutputFormat2.BLOCK_SIZE_FAMILIES_CONF_KEY,
HFileOutputFormat2.serializeColumnFamilyAttribute
(HFileOutputFormat2.blockSizeDetails, Arrays.asList(table
- .getTableDescriptor())));
+ .getDescriptor())));
// read back family specific data block encoding settings from the
// configuration
@@ -990,7 +991,7 @@ public class TestHFileOutputFormat2 {
.setBlockCacheEnabled(false)
.setTimeToLive(0));
}
- Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
+ Mockito.doReturn(mockTableDescriptor).when(table).getDescriptor();
}
/**
@@ -1035,7 +1036,7 @@ public class TestHFileOutputFormat2 {
Table table = Mockito.mock(Table.class);
setupMockColumnFamiliesForDataBlockEncoding(table,
familyToDataBlockEncoding);
- HTableDescriptor tableDescriptor = table.getTableDescriptor();
+ TableDescriptor tableDescriptor = table.getDescriptor();
conf.set(HFileOutputFormat2.DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
HFileOutputFormat2.serializeColumnFamilyAttribute
(HFileOutputFormat2.dataBlockEncodingDetails, Arrays
@@ -1067,7 +1068,7 @@ public class TestHFileOutputFormat2 {
.setBlockCacheEnabled(false)
.setTimeToLive(0));
}
- Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
+ Mockito.doReturn(mockTableDescriptor).when(table).getDescriptor();
}
/**
@@ -1125,7 +1126,7 @@ public class TestHFileOutputFormat2 {
Table table = Mockito.mock(Table.class);
RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
HTableDescriptor htd = new HTableDescriptor(TABLE_NAMES[0]);
- Mockito.doReturn(htd).when(table).getTableDescriptor();
+ Mockito.doReturn(htd).when(table).getDescriptor();
for (HColumnDescriptor hcd: HBaseTestingUtility.generateColumnDescriptors()) {
htd.addFamily(hcd);
}
@@ -1145,7 +1146,7 @@ public class TestHFileOutputFormat2 {
Job job = new Job(conf, "testLocalMRIncrementalLoad");
job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings"));
setupRandomGeneratorMapper(job, false);
- HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
+ HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
FileOutputFormat.setOutputPath(job, dir);
context = createTestTaskAttemptContext(job);
HFileOutputFormat2 hof = new HFileOutputFormat2();
@@ -1411,10 +1412,8 @@ public class TestHFileOutputFormat2 {
Admin admin = c.getAdmin();
RegionLocator regionLocator = c.getRegionLocator(tname)) {
Path outDir = new Path("incremental-out");
- runIncrementalPELoad(conf,
- Arrays
- .asList(new HFileOutputFormat2.TableInfo(admin.getDescriptor(tname), regionLocator)),
- outDir, false);
+ runIncrementalPELoad(conf, Arrays.asList(new HFileOutputFormat2.TableInfo(admin
+ .getDescriptor(tname), regionLocator)), outDir, false);
}
} else {
throw new RuntimeException(
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormatBase.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormatBase.java
index eff26d7..af97793 100644
--- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormatBase.java
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormatBase.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.BufferedMutatorParams;
import org.apache.hadoop.hbase.client.Connection;
@@ -240,5 +241,10 @@ public class TestMultiTableInputFormatBase {
@Override
public void clearRegionLocationCache() {
}
+
+ @Override
+ public AsyncConnection toAsyncConnection() {
+ return null;
+ }
}
}
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatBase.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatBase.java
index 944bd10..5fd5ccf 100644
--- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatBase.java
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatBase.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.BufferedMutatorParams;
import org.apache.hadoop.hbase.client.Connection;
@@ -290,5 +291,10 @@ public class TestTableInputFormatBase {
@Override
public void clearRegionLocationCache() {
}
+
+ @Override
+ public AsyncConnection toAsyncConnection() {
+ throw new UnsupportedOperationException();
+ }
}
}
diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/ResourceBase.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/ResourceBase.java
index ecfe86d..a0deb7e 100644
--- a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/ResourceBase.java
+++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/ResourceBase.java
@@ -1,5 +1,4 @@
-/*
- *
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,15 +19,13 @@
package org.apache.hadoop.hbase.rest;
import java.io.IOException;
-
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
-
-import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
+import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
import org.apache.hadoop.util.StringUtils;
+import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public class ResourceBase implements Constants {
@@ -82,10 +79,9 @@ public class ResourceBase implements Constants {
StringUtils.stringifyException(exp) + CRLF)
.build());
}
- if (exp instanceof RetriesExhaustedWithDetailsException) {
- RetriesExhaustedWithDetailsException retryException =
- (RetriesExhaustedWithDetailsException) exp;
- processException(retryException.getCause(0));
+ if (exp instanceof RetriesExhaustedException) {
+ RetriesExhaustedException retryException = (RetriesExhaustedException) exp;
+ processException(retryException.getCause());
}
throw new WebApplicationException(
Response.status(Response.Status.SERVICE_UNAVAILABLE)
diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/SchemaResource.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/SchemaResource.java
index 786fcb6..dcfe771 100644
--- a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/SchemaResource.java
+++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/SchemaResource.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.rest;
import java.io.IOException;
import java.util.Map;
-
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
@@ -35,20 +34,19 @@ import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.ResponseBuilder;
import javax.ws.rs.core.UriInfo;
import javax.xml.namespace.QName;
-
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotEnabledException;
import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.rest.model.ColumnSchemaModel;
import org.apache.hadoop.hbase.rest.model.TableSchemaModel;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@InterfaceAudience.Private
public class SchemaResource extends ResourceBase {
@@ -73,13 +71,9 @@ public class SchemaResource extends ResourceBase {
this.tableResource = tableResource;
}
- private HTableDescriptor getTableSchema() throws IOException,
- TableNotFoundException {
- Table table = servlet.getTable(tableResource.getName());
- try {
- return table.getTableDescriptor();
- } finally {
- table.close();
+ private HTableDescriptor getTableSchema() throws IOException, TableNotFoundException {
+ try (Table table = servlet.getTable(tableResource.getName())) {
+ return new HTableDescriptor(table.getDescriptor());
}
}
diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
index 29b48e1..4addfb4 100644
--- a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
+++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
@@ -23,19 +23,26 @@ import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.Service;
import com.google.protobuf.ServiceException;
-
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
@@ -52,7 +59,6 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
-import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.rest.Constants;
@@ -63,19 +69,9 @@ import org.apache.hadoop.hbase.rest.model.ScannerModel;
import org.apache.hadoop.hbase.rest.model.TableSchemaModel;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.StringUtils;
-
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.io.UnsupportedEncodingException;
-import java.net.URLEncoder;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.TimeUnit;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
@@ -257,36 +253,6 @@ public class RemoteHTable implements Table {
}
@Override
- @Deprecated
- public HTableDescriptor getTableDescriptor() throws IOException {
- StringBuilder sb = new StringBuilder();
- sb.append('/');
- sb.append(Bytes.toString(name));
- sb.append('/');
- sb.append("schema");
- for (int i = 0; i < maxRetries; i++) {
- Response response = client.get(sb.toString(), Constants.MIMETYPE_PROTOBUF);
- int code = response.getCode();
- switch (code) {
- case 200:
- TableSchemaModel schema = new TableSchemaModel();
- schema.getObjectFromMessage(response.getBody());
- return schema.getTableDescriptor();
- case 509:
- try {
- Thread.sleep(sleepTime);
- } catch (InterruptedException e) {
- throw (InterruptedIOException)new InterruptedIOException().initCause(e);
- }
- break;
- default:
- throw new IOException("schema request returned " + code);
- }
- }
- throw new IOException("schema request timed out");
- }
-
- @Override
public void close() throws IOException {
client.shutdown();
}
@@ -316,12 +282,13 @@ public class RemoteHTable implements Table {
int maxVersions = 1;
int count = 0;
- for(Get g:gets) {
+ for (Get g : gets) {
- if ( count == 0 ) {
+ if (count == 0) {
maxVersions = g.getMaxVersions();
} else if (g.getMaxVersions() != maxVersions) {
- LOG.warn("MaxVersions on Gets do not match, using the first in the list ("+maxVersions+")");
+ LOG.warn(
+ "MaxVersions on Gets do not match, using the first in the list (" + maxVersions + ")");
}
if (g.getFilter() != null) {
@@ -329,7 +296,7 @@ public class RemoteHTable implements Table {
}
rows[count] = g.getRow();
- count ++;
+ count++;
}
String spec = buildMultiRowSpec(rows, maxVersions);
@@ -346,7 +313,7 @@ public class RemoteHTable implements Table {
CellSetModel model = new CellSetModel();
model.getObjectFromMessage(response.getBody());
Result[] results = buildResultFromModel(model);
- if ( results.length > 0) {
+ if (results.length > 0) {
return results;
}
// fall through
@@ -357,7 +324,7 @@ public class RemoteHTable implements Table {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
- throw (InterruptedIOException)new InterruptedIOException().initCause(e);
+ throw (InterruptedIOException) new InterruptedIOException().initCause(e);
}
break;
default:
@@ -393,21 +360,21 @@ public class RemoteHTable implements Table {
sb.append('/');
sb.append(toURLEncodedBytes(put.getRow()));
for (int i = 0; i < maxRetries; i++) {
- Response response = client.put(sb.toString(), Constants.MIMETYPE_PROTOBUF,
- model.createProtobufOutput());
+ Response response =
+ client.put(sb.toString(), Constants.MIMETYPE_PROTOBUF, model.createProtobufOutput());
int code = response.getCode();
switch (code) {
- case 200:
- return;
- case 509:
- try {
- Thread.sleep(sleepTime);
- } catch (InterruptedException e) {
- throw (InterruptedIOException)new InterruptedIOException().initCause(e);
- }
- break;
- default:
- throw new IOException("put request failed with " + code);
+ case 200:
+ return;
+ case 509:
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ throw (InterruptedIOException) new InterruptedIOException().initCause(e);
+ }
+ break;
+ default:
+ throw new IOException("put request failed with " + code);
}
}
throw new IOException("put request timed out");
@@ -419,24 +386,24 @@ public class RemoteHTable implements Table {
// ignores the row specification in the URI
// separate puts by row
- TreeMap> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
- for (Put put: puts) {
+ TreeMap> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ for (Put put : puts) {
byte[] row = put.getRow();
List cells = map.get(row);
if (cells == null) {
cells = new ArrayList<>();
map.put(row, cells);
}
- for (List l: put.getFamilyCellMap().values()) {
+ for (List l : put.getFamilyCellMap().values()) {
cells.addAll(l);
}
}
// build the cell set
CellSetModel model = new CellSetModel();
- for (Map.Entry> e: map.entrySet()) {
+ for (Map.Entry> e : map.entrySet()) {
RowModel row = new RowModel(e.getKey());
- for (Cell cell: e.getValue()) {
+ for (Cell cell : e.getValue()) {
row.addCell(new CellModel(cell));
}
model.addRow(row);
@@ -448,21 +415,21 @@ public class RemoteHTable implements Table {
sb.append(Bytes.toString(name));
sb.append("/$multiput"); // can be any nonexistent row
for (int i = 0; i < maxRetries; i++) {
- Response response = client.put(sb.toString(), Constants.MIMETYPE_PROTOBUF,
- model.createProtobufOutput());
+ Response response =
+ client.put(sb.toString(), Constants.MIMETYPE_PROTOBUF, model.createProtobufOutput());
int code = response.getCode();
switch (code) {
- case 200:
- return;
- case 509:
- try {
- Thread.sleep(sleepTime);
- } catch (InterruptedException e) {
- throw (InterruptedIOException)new InterruptedIOException().initCause(e);
- }
- break;
- default:
- throw new IOException("multiput request failed with " + code);
+ case 200:
+ return;
+ case 509:
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ throw (InterruptedIOException) new InterruptedIOException().initCause(e);
+ }
+ break;
+ default:
+ throw new IOException("multiput request failed with " + code);
}
}
throw new IOException("multiput request timed out");
@@ -505,7 +472,31 @@ public class RemoteHTable implements Table {
@Override
public TableDescriptor getDescriptor() throws IOException {
- return getTableDescriptor();
+ StringBuilder sb = new StringBuilder();
+ sb.append('/');
+ sb.append(Bytes.toString(name));
+ sb.append('/');
+ sb.append("schema");
+ for (int i = 0; i < maxRetries; i++) {
+ Response response = client.get(sb.toString(), Constants.MIMETYPE_PROTOBUF);
+ int code = response.getCode();
+ switch (code) {
+ case 200:
+ TableSchemaModel schema = new TableSchemaModel();
+ schema.getObjectFromMessage(response.getBody());
+ return schema.getTableDescriptor();
+ case 509:
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ throw (InterruptedIOException) new InterruptedIOException().initCause(e);
+ }
+ break;
+ default:
+ throw new IOException("schema request returned " + code);
+ }
+ }
+ throw new IOException("schema request timed out");
}
class Scanner implements ResultScanner {
@@ -671,13 +662,6 @@ public class RemoteHTable implements Table {
return true;
}
- @Override
- @Deprecated
- public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
- byte[] value, Put put) throws IOException {
- return doCheckAndPut(row, family, qualifier, value, put);
- }
-
private boolean doCheckAndPut(byte[] row, byte[] family, byte[] qualifier,
byte[] value, Put put) throws IOException {
// column to check-the-value
@@ -714,26 +698,6 @@ public class RemoteHTable implements Table {
throw new IOException("checkAndPut request timed out");
}
- @Override
- @Deprecated
- public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
- CompareOp compareOp, byte[] value, Put put) throws IOException {
- throw new IOException("checkAndPut for non-equal comparison not implemented");
- }
-
- @Override
- @Deprecated
- public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
- CompareOperator compareOp, byte[] value, Put put) throws IOException {
- throw new IOException("checkAndPut for non-equal comparison not implemented");
- }
-
- @Override
- public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
- byte[] value, Delete delete) throws IOException {
- return doCheckAndDelete(row, family, qualifier, value, delete);
- }
-
private boolean doCheckAndDelete(byte[] row, byte[] family, byte[] qualifier,
byte[] value, Delete delete) throws IOException {
Put put = new Put(row);
@@ -772,39 +736,11 @@ public class RemoteHTable implements Table {
}
@Override
- @Deprecated
- public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
- CompareOp compareOp, byte[] value, Delete delete) throws IOException {
- throw new IOException("checkAndDelete for non-equal comparison not implemented");
- }
-
- @Override
- @Deprecated
- public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
- CompareOperator compareOp, byte[] value, Delete delete) throws IOException {
- throw new IOException("checkAndDelete for non-equal comparison not implemented");
- }
-
- @Override
public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
return new CheckAndMutateBuilderImpl(row, family);
}
@Override
- @Deprecated
- public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
- CompareOp compareOp, byte[] value, RowMutations rm) throws IOException {
- throw new UnsupportedOperationException("checkAndMutate not implemented");
- }
-
- @Override
- @Deprecated
- public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
- CompareOperator compareOp, byte[] value, RowMutations rm) throws IOException {
- throw new UnsupportedOperationException("checkAndMutate not implemented");
- }
-
- @Override
public Result increment(Increment increment) throws IOException {
throw new IOException("Increment not supported");
}
@@ -877,69 +813,21 @@ public class RemoteHTable implements Table {
}
@Override
- @Deprecated
- public void setOperationTimeout(int operationTimeout) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- @Deprecated
- public int getOperationTimeout() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- @Deprecated
- public void setRpcTimeout(int rpcTimeout) {
- throw new UnsupportedOperationException();
- }
-
- @Override
public long getReadRpcTimeout(TimeUnit unit) {
throw new UnsupportedOperationException();
}
@Override
- @Deprecated
- public int getRpcTimeout() {
- throw new UnsupportedOperationException();
- }
-
- @Override
public long getRpcTimeout(TimeUnit unit) {
throw new UnsupportedOperationException();
}
@Override
- @Deprecated
- public int getReadRpcTimeout() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- @Deprecated
- public void setReadRpcTimeout(int readRpcTimeout) {
- throw new UnsupportedOperationException();
- }
-
- @Override
public long getWriteRpcTimeout(TimeUnit unit) {
throw new UnsupportedOperationException();
}
@Override
- @Deprecated
- public int getWriteRpcTimeout() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- @Deprecated
- public void setWriteRpcTimeout(int writeRpcTimeout) {
- throw new UnsupportedOperationException();
- }
-
- @Override
public long getOperationTimeout(TimeUnit unit) {
throw new UnsupportedOperationException();
}
diff --git a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/TestScannerResource.java b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/TestScannerResource.java
index da09473..28d941c 100644
--- a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/TestScannerResource.java
+++ b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/TestScannerResource.java
@@ -377,22 +377,20 @@ public class TestScannerResource {
assertEquals(404, response.getCode());
}
- // performs table scan during which the underlying table is disabled
- // assert that we get 410 (Gone)
@Test
public void testTableScanWithTableDisable() throws IOException {
+ TEST_UTIL.getAdmin().disableTable(TABLE_TO_BE_DISABLED);
ScannerModel model = new ScannerModel();
model.addColumn(Bytes.toBytes(COLUMN_1));
model.setCaching(1);
Response response = client.put("/" + TABLE_TO_BE_DISABLED + "/scanner",
Constants.MIMETYPE_PROTOBUF, model.createProtobufOutput());
+ // we will see the exception when we actually want to get the result.
assertEquals(201, response.getCode());
String scannerURI = response.getLocation();
assertNotNull(scannerURI);
- TEST_UTIL.getAdmin().disableTable(TABLE_TO_BE_DISABLED);
- response = client.get(scannerURI, Constants.MIMETYPE_PROTOBUF);
- assertTrue("got " + response.getCode(), response.getCode() == 410);
+ response = client.get(scannerURI, Constants.MIMETYPE_PROTOBUF);
+ assertEquals(410, response.getCode());
}
-
}
diff --git a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/TestRemoteTable.java b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/TestRemoteTable.java
index c6f5195..269dc68 100644
--- a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/TestRemoteTable.java
+++ b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/TestRemoteTable.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.rest.HBaseRESTTestingUtility;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RestTests;
@@ -154,8 +155,8 @@ public class TestRemoteTable {
Table table = null;
try {
table = TEST_UTIL.getConnection().getTable(TABLE);
- HTableDescriptor local = table.getTableDescriptor();
- assertEquals(remoteTable.getTableDescriptor(), local);
+ TableDescriptor local = table.getDescriptor();
+ assertEquals(remoteTable.getDescriptor(), new HTableDescriptor(local));
} finally {
if (null != table) table.close();
}
@@ -504,7 +505,7 @@ public class TestRemoteTable {
assertTrue(Bytes.equals(VALUE_1, value1));
assertNull(value2);
assertTrue(remoteTable.exists(get));
- assertEquals(1, remoteTable.existsAll(Collections.singletonList(get)).length);
+ assertEquals(1, remoteTable.exists(Collections.singletonList(get)).length);
Delete delete = new Delete(ROW_1);
remoteTable.checkAndMutate(ROW_1, COLUMN_1).qualifier(QUALIFIER_1)
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/SharedAsyncConnection.java
similarity index 53%
copy from hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java
copy to hbase-server/src/main/java/org/apache/hadoop/hbase/client/SharedAsyncConnection.java
index d5fc58e..0f05b21 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/SharedAsyncConnection.java
@@ -18,138 +18,95 @@
package org.apache.hadoop.hbase.client;
import java.io.IOException;
-import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.ipc.RpcClient;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hadoop.security.token.Token;
-
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
+import org.apache.yetus.audience.InterfaceAudience;
/**
- * Can be overridden in UT if you only want to implement part of the methods in
- * {@link AsyncClusterConnection}.
+ * Wraps a {@link AsyncConnection} to make it can't be closed.
*/
-public class DummyAsyncClusterConnection implements AsyncClusterConnection {
+@InterfaceAudience.Private
+public class SharedAsyncConnection implements AsyncConnection {
+
+ private final AsyncConnection conn;
+
+ public SharedAsyncConnection(AsyncConnection conn) {
+ this.conn = conn;
+ }
+
+ @Override
+ public boolean isClosed() {
+ return conn.isClosed();
+ }
+
+ @Override
+ public void close() throws IOException {
+ throw new UnsupportedOperationException("Shared connection");
+ }
@Override
public Configuration getConfiguration() {
- return null;
+ return conn.getConfiguration();
}
@Override
public AsyncTableRegionLocator getRegionLocator(TableName tableName) {
- return null;
+ return conn.getRegionLocator(tableName);
}
@Override
public void clearRegionLocationCache() {
+ conn.clearRegionLocationCache();
}
@Override
public AsyncTableBuilder getTableBuilder(TableName tableName) {
- return null;
+ return conn.getTableBuilder(tableName);
}
@Override
public AsyncTableBuilder getTableBuilder(TableName tableName,
ExecutorService pool) {
- return null;
+ return conn.getTableBuilder(tableName, pool);
}
@Override
public AsyncAdminBuilder getAdminBuilder() {
- return null;
+ return conn.getAdminBuilder();
}
@Override
public AsyncAdminBuilder getAdminBuilder(ExecutorService pool) {
- return null;
+ return conn.getAdminBuilder(pool);
}
@Override
public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName) {
- return null;
+ return conn.getBufferedMutatorBuilder(tableName);
}
@Override
public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName,
ExecutorService pool) {
- return null;
+ return conn.getBufferedMutatorBuilder(tableName, pool);
}
@Override
public CompletableFuture getHbck() {
- return null;
+ return conn.getHbck();
}
@Override
public Hbck getHbck(ServerName masterServer) throws IOException {
- return null;
- }
-
- @Override
- public boolean isClosed() {
- return false;
+ return conn.getHbck(masterServer);
}
@Override
- public void close() throws IOException {
+ public Connection toConnection() {
+ return new SharedConnection(conn.toConnection());
}
- @Override
- public AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName) {
- return null;
- }
-
- @Override
- public NonceGenerator getNonceGenerator() {
- return null;
- }
-
- @Override
- public RpcClient getRpcClient() {
- return null;
- }
-
- @Override
- public CompletableFuture flush(byte[] regionName,
- boolean writeFlushWALMarker) {
- return null;
- }
-
- @Override
- public CompletableFuture replay(TableName tableName, byte[] encodedRegionName, byte[] row,
- List entries, int replicaId, int numRetries, long operationTimeoutNs) {
- return null;
- }
-
- @Override
- public CompletableFuture getRegionLocations(TableName tableName, byte[] row,
- boolean reload) {
- return null;
- }
-
- @Override
- public CompletableFuture prepareBulkLoad(TableName tableName) {
- return null;
- }
-
- @Override
- public CompletableFuture bulkLoad(TableName tableName,
- List> familyPaths, byte[] row, boolean assignSeqNum, Token> userToken,
- String bulkToken, boolean copyFiles) {
- return null;
- }
-
- @Override
- public CompletableFuture cleanupBulkLoad(TableName tableName, String bulkToken) {
- return null;
- }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/SharedConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/SharedConnection.java
similarity index 85%
rename from hbase-server/src/main/java/org/apache/hadoop/hbase/SharedConnection.java
rename to hbase-server/src/main/java/org/apache/hadoop/hbase/client/SharedConnection.java
index de0c39b..f189a2a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/SharedConnection.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/SharedConnection.java
@@ -15,22 +15,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hbase;
+package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.BufferedMutator;
-import org.apache.hadoop.hbase.client.BufferedMutatorParams;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.Hbck;
-import org.apache.hadoop.hbase.client.RegionLocator;
-import org.apache.hadoop.hbase.client.TableBuilder;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
/**
- * Wraps a Connection to make it can't be closed or aborted.
+ * Wraps a {@link Connection} to make it can't be closed or aborted.
*/
@InterfaceAudience.Private
public class SharedConnection implements Connection {
@@ -105,4 +100,9 @@ public class SharedConnection implements Connection {
public Hbck getHbck(ServerName masterServer) throws IOException {
return conn.getHbck(masterServer);
}
+
+ @Override
+ public AsyncConnection toAsyncConnection() {
+ return new SharedAsyncConnection(conn.toAsyncConnection());
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 0256324..385ae5a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -1855,7 +1855,7 @@ public class HMaster extends HRegionServer implements MasterServices {
List plans = this.normalizer.computePlanForTable(table);
if (plans != null) {
for (NormalizationPlan plan : plans) {
- plan.execute(connection.getAdmin());
+ plan.execute(asyncClusterConnection.toConnection().getAdmin());
if (plan.getType() == PlanType.SPLIT) {
splitPlanCount++;
} else if (plan.getType() == PlanType.MERGE) {
@@ -3055,9 +3055,6 @@ public class HMaster extends HRegionServer implements MasterServices {
// this is what we want especially if the Master is in startup phase doing call outs to
// hbase:meta, etc. when cluster is down. Without ths connection close, we'd have to wait on
// the rpc to timeout.
- if (this.connection != null) {
- this.connection.close();
- }
if (this.asyncClusterConnection != null) {
this.asyncClusterConnection.close();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
index 720f496..2680030 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
@@ -28,12 +28,12 @@ import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.MetaMutationAnnotation;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.SharedConnection;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.MasterSwitchType;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.SharedConnection;
import org.apache.hadoop.hbase.client.SnapshotDescription;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.coprocessor.BaseEnvironment;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 9a0a62a..0de4c32b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -85,6 +85,7 @@ import org.apache.hadoop.hbase.ZNodeClearer;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
@@ -153,6 +154,7 @@ import org.apache.hadoop.hbase.util.CompressionTest;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.HasThread;
import org.apache.hadoop.hbase.util.JvmPauseMonitor;
import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig;
@@ -258,19 +260,6 @@ public class HRegionServer extends HasThread implements
protected HeapMemoryManager hMemManager;
/**
- * Connection to be shared by services.
- *
- * Initialized at server startup and closed when server shuts down.
- *
- * Clients must never close it explicitly.
- *
- * Clients hosted by this Server should make use of this connection rather than create their own;
- * if they create their own, there is no way for the hosting server to shutdown ongoing client
- * RPCs.
- */
- protected Connection connection;
-
- /**
* The asynchronous cluster connection to be shared by services.
*/
protected AsyncClusterConnection asyncClusterConnection;
@@ -804,29 +793,7 @@ public class HRegionServer extends HasThread implements
}
/**
- * Create a 'smarter' Connection, one that is capable of by-passing RPC if the request is to the
- * local server; i.e. a short-circuit Connection. Safe to use going to local or remote server.
- */
- private Connection createConnection() throws IOException {
- // Create a cluster connection that when appropriate, can short-circuit and go directly to the
- // local server if the request is to the local server bypassing RPC. Can be used for both local
- // and remote invocations.
- Connection conn =
- ConnectionUtils.createShortCircuitConnection(unsetClientZookeeperQuorum(), null,
- userProvider.getCurrent(), serverName, rpcServices, rpcServices);
- // This is used to initialize the batch thread pool inside the connection implementation.
- // When deploy a fresh cluster, we may first use the cluster connection in InitMetaProcedure,
- // which will be executed inside the PEWorker, and then the batch thread pool will inherit the
- // thread group of PEWorker, which will be destroy when shutting down the ProcedureExecutor. It
- // will cause lots of procedure related UTs to fail, so here let's initialize it first, no harm.
- conn.getTable(TableName.META_TABLE_NAME).close();
- return conn;
- }
-
- /**
* Run test on configured codecs to make sure supporting libs are in place.
- * @param c
- * @throws IOException
*/
private static void checkCodecs(final Configuration c) throws IOException {
// check to see if the codec list is available:
@@ -848,11 +815,12 @@ public class HRegionServer extends HasThread implements
* Setup our cluster connection if not already initialized.
*/
protected final synchronized void setupClusterConnection() throws IOException {
- if (connection == null) {
- connection = createConnection();
+ if (asyncClusterConnection == null) {
+ Configuration conf = unsetClientZookeeperQuorum();
+ InetSocketAddress localAddress = new InetSocketAddress(this.rpcServices.isa.getAddress(), 0);
+ User user = userProvider.getCurrent();
asyncClusterConnection =
- ClusterConnectionFactory.createAsyncClusterConnection(unsetClientZookeeperQuorum(),
- new InetSocketAddress(this.rpcServices.isa.getAddress(), 0), userProvider.getCurrent());
+ ClusterConnectionFactory.createAsyncClusterConnection(conf, localAddress, user);
}
}
@@ -1121,15 +1089,6 @@ public class HRegionServer extends HasThread implements
LOG.info("stopping server " + this.serverName);
}
- if (this.connection != null && !connection.isClosed()) {
- try {
- this.connection.close();
- } catch (IOException e) {
- // Although the {@link Closeable} interface throws an {@link
- // IOException}, in reality, the implementation would never do that.
- LOG.warn("Attempt to close server's short circuit ClusterConnection failed.", e);
- }
- }
if (this.asyncClusterConnection != null) {
try {
this.asyncClusterConnection.close();
@@ -2194,7 +2153,7 @@ public class HRegionServer extends HasThread implements
@Override
public Connection getConnection() {
- return this.connection;
+ return getAsyncConnection().toConnection();
}
@Override
@@ -2300,8 +2259,8 @@ public class HRegionServer extends HasThread implements
}
} else {
try {
- MetaTableAccessor.updateRegionLocation(connection,
- hris[0], serverName, openSeqNum, masterSystemTime);
+ MetaTableAccessor.updateRegionLocation(asyncClusterConnection.toConnection(), hris[0],
+ serverName, openSeqNum, masterSystemTime);
} catch (IOException e) {
LOG.info("Failed to update meta", e);
return false;
@@ -2331,7 +2290,7 @@ public class HRegionServer extends HasThread implements
// Keep looping till we get an error. We want to send reports even though server is going down.
// Only go down if clusterConnection is null. It is set to null almost as last thing as the
// HRegionServer does down.
- while (this.connection != null && !this.connection.isClosed()) {
+ while (this.asyncClusterConnection != null && !this.asyncClusterConnection.isClosed()) {
RegionServerStatusService.BlockingInterface rss = rssStub;
try {
if (rss == null) {
@@ -3784,7 +3743,7 @@ public class HRegionServer extends HasThread implements
@Override
public void unassign(byte[] regionName) throws IOException {
- connection.getAdmin().unassign(regionName, false);
+ FutureUtils.get(asyncClusterConnection.getAdmin().unassign(regionName, false));
}
@Override
@@ -3833,8 +3792,7 @@ public class HRegionServer extends HasThread implements
@Override
public Connection createConnection(Configuration conf) throws IOException {
User user = UserProvider.instantiate(conf).getCurrent();
- return ConnectionUtils.createShortCircuitConnection(conf, null, user, this.serverName,
- this.rpcServices, this.rpcServices);
+ return ConnectionFactory.createConnection(conf, null, user);
}
public void executeProcedure(long procId, RSProcedureCallable callable) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
index 16fd332..1506ed5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
@@ -40,7 +40,6 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.RawCellBuilder;
import org.apache.hadoop.hbase.RawCellBuilderFactory;
import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.SharedConnection;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Delete;
@@ -52,6 +51,7 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.SharedConnection;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.coprocessor.BaseEnvironment;
import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java
index 42a4e00..f15312a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java
@@ -25,8 +25,8 @@ import com.google.protobuf.Service;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.SharedConnection;
import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.SharedConnection;
import org.apache.hadoop.hbase.coprocessor.BaseEnvironment;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.CoprocessorServiceBackwardCompatiblity;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
index 0949207..11a7a03 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
@@ -2203,15 +2203,14 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
if (regex == null && tableNamesList != null && !tableNamesList.isEmpty()) {
// Otherwise, if the requestor has ADMIN or CREATE privs for all listed tables, the
// request can be granted.
- TableName [] sns = null;
try (Admin admin = ctx.getEnvironment().getConnection().getAdmin()) {
- sns = admin.listTableNames();
- if (sns == null) return;
- for (TableName tableName: tableNamesList) {
+ for (TableName tableName : tableNamesList) {
// Skip checks for a table that does not exist
- if (!admin.tableExists(tableName)) continue;
- requirePermission(ctx, "getTableDescriptors", tableName, null, null,
- Action.ADMIN, Action.CREATE);
+ if (!admin.tableExists(tableName)) {
+ continue;
+ }
+ requirePermission(ctx, "getTableDescriptors", tableName, null, null, Action.ADMIN,
+ Action.CREATE);
}
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/MultiHConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/MultiHConnection.java
deleted file mode 100644
index d095fa3..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/MultiHConnection.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.util;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Row;
-import org.apache.hadoop.hbase.client.coprocessor.Batch;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Provides ability to create multiple Connection instances and allows to process a batch of
- * actions using CHTable.doBatchWithCallback()
- */
-@InterfaceAudience.Private
-public class MultiHConnection {
- private static final Logger LOG = LoggerFactory.getLogger(MultiHConnection.class);
- private Connection[] connections;
- private final Object connectionsLock = new Object();
- private final int noOfConnections;
- private ExecutorService batchPool;
-
- /**
- * Create multiple Connection instances and initialize a thread pool executor
- * @param conf configuration
- * @param noOfConnections total no of Connections to create
- * @throws IOException if IO failure occurs
- */
- public MultiHConnection(Configuration conf, int noOfConnections)
- throws IOException {
- this.noOfConnections = noOfConnections;
- synchronized (this.connectionsLock) {
- connections = new Connection[noOfConnections];
- for (int i = 0; i < noOfConnections; i++) {
- Connection conn = ConnectionFactory.createConnection(conf);
- connections[i] = conn;
- }
- }
- createBatchPool(conf);
- }
-
- /**
- * Close the open connections and shutdown the batchpool
- */
- public void close() {
- synchronized (connectionsLock) {
- if (connections != null) {
- for (Connection conn : connections) {
- if (conn != null) {
- try {
- conn.close();
- } catch (IOException e) {
- LOG.info("Got exception in closing connection", e);
- } finally {
- conn = null;
- }
- }
- }
- connections = null;
- }
- }
- if (this.batchPool != null && !this.batchPool.isShutdown()) {
- this.batchPool.shutdown();
- try {
- if (!this.batchPool.awaitTermination(10, TimeUnit.SECONDS)) {
- this.batchPool.shutdownNow();
- }
- } catch (InterruptedException e) {
- this.batchPool.shutdownNow();
- }
- }
-
- }
-
- /**
- * Randomly pick a connection and process the batch of actions for a given table
- * @param actions the actions
- * @param tableName table name
- * @param results the results array
- * @param callback to run when results are in
- * @throws IOException If IO failure occurs
- */
- public void processBatchCallback(List extends Row> actions, TableName tableName,
- Object[] results, Batch.Callback callback) throws IOException {
- // Currently used by RegionStateStore
- HTable.doBatchWithCallback(actions, results, callback,
- connections[ThreadLocalRandom.current().nextInt(noOfConnections)], batchPool, tableName);
- }
-
- // Copied from ConnectionImplementation.getBatchPool()
- // We should get rid of this when Connection.processBatchCallback is un-deprecated and provides
- // an API to manage a batch pool
- private void createBatchPool(Configuration conf) {
- // Use the same config for keep alive as in ConnectionImplementation.getBatchPool();
- int maxThreads = conf.getInt("hbase.multihconnection.threads.max", 256);
- if (maxThreads == 0) {
- maxThreads = Runtime.getRuntime().availableProcessors() * 8;
- }
- long keepAliveTime = conf.getLong("hbase.multihconnection.threads.keepalivetime", 60);
- LinkedBlockingQueue workQueue =
- new LinkedBlockingQueue<>(maxThreads
- * conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
- HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
- ThreadPoolExecutor tpe =
- new ThreadPoolExecutor(maxThreads, maxThreads, keepAliveTime, TimeUnit.SECONDS, workQueue,
- Threads.newDaemonThreadFactory("MultiHConnection" + "-shared-"));
- tpe.allowCoreThreadTimeOut(true);
- this.batchPool = tpe;
- }
-
-}
diff --git a/hbase-server/src/main/resources/hbase-webapps/master/table.jsp b/hbase-server/src/main/resources/hbase-webapps/master/table.jsp
index e52d8b2..14d233a 100644
--- a/hbase-server/src/main/resources/hbase-webapps/master/table.jsp
+++ b/hbase-server/src/main/resources/hbase-webapps/master/table.jsp
@@ -31,6 +31,7 @@
import="java.util.TreeMap"
import="org.apache.commons.lang3.StringEscapeUtils"
import="org.apache.hadoop.conf.Configuration"
+ import="org.apache.hadoop.hbase.HTableDescriptor"
import="org.apache.hadoop.hbase.HColumnDescriptor"
import="org.apache.hadoop.hbase.HConstants"
import="org.apache.hadoop.hbase.HRegionLocation"
@@ -131,7 +132,7 @@
if ( fqtn != null ) {
try {
table = master.getConnection().getTable(TableName.valueOf(fqtn));
- if (table.getTableDescriptor().getRegionReplication() > 1) {
+ if (table.getDescriptor().getRegionReplication() > 1) {
tableHeader = "