From commits-return-83637-archive-asf-public=cust-asf.ponee.io@hbase.apache.org Sun Mar 3 06:06:30 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id AFCFF18067E for ; Sun, 3 Mar 2019 07:06:27 +0100 (CET) Received: (qmail 51912 invoked by uid 500); 3 Mar 2019 06:06:26 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 50726 invoked by uid 99); 3 Mar 2019 06:06:25 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 03 Mar 2019 06:06:25 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 8092987764; Sun, 3 Mar 2019 06:06:24 +0000 (UTC) Date: Sun, 03 Mar 2019 06:06:32 +0000 To: "commits@hbase.apache.org" Subject: [hbase] 09/11: HBASE-21778 Remove the usage of the locateRegion related methods in ClusterConnection MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit From: zhangduo@apache.org In-Reply-To: <155159318339.14714.10534412582727639998@gitbox.apache.org> References: <155159318339.14714.10534412582727639998@gitbox.apache.org> X-Git-Host: gitbox.apache.org X-Git-Repo: hbase X-Git-Refname: refs/heads/HBASE-21512 X-Git-Reftype: branch X-Git-Rev: 69469f3ff930bc9babd52a93509c5d96561606fc X-Git-NotificationType: diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated Message-Id: <20190303060624.8092987764@gitbox.apache.org> This is an automated email from the ASF dual-hosted git repository. zhangduo pushed a commit to branch HBASE-21512 in repository https://gitbox.apache.org/repos/asf/hbase.git commit 69469f3ff930bc9babd52a93509c5d96561606fc Author: Duo Zhang AuthorDate: Fri Feb 1 16:40:34 2019 +0800 HBASE-21778 Remove the usage of the locateRegion related methods in ClusterConnection Signed-off-by: Guanghao Zhang --- .../apache/hadoop/hbase/client/AsyncProcess.java | 4 +- .../hadoop/hbase/client/BufferedMutatorImpl.java | 3 +- .../hbase/client/ClientAsyncPrefetchScanner.java | 4 +- .../apache/hadoop/hbase/client/ClientScanner.java | 6 +- .../hadoop/hbase/client/ClientServiceCallable.java | 4 +- .../hadoop/hbase/client/ClientSimpleScanner.java | 4 +- .../hadoop/hbase/client/ClusterConnection.java | 149 --------------------- .../hbase/client/ConnectionImplementation.java | 127 ++++++++++++------ .../hadoop/hbase/client/FlushRegionCallable.java | 6 +- .../org/apache/hadoop/hbase/client/HBaseAdmin.java | 10 +- .../org/apache/hadoop/hbase/client/HTable.java | 4 +- .../hbase/client/RegionAdminServiceCallable.java | 12 +- .../hadoop/hbase/client/RegionServerCallable.java | 10 +- .../hadoop/hbase/client/ReversedClientScanner.java | 4 +- .../client/RpcRetryingCallerWithReadReplicas.java | 36 +++-- .../hbase/client/ScannerCallableWithReplicas.java | 27 ++-- .../hadoop/hbase/client/TestAsyncProcess.java | 98 +++++++------- .../TestAsyncProcessWithRegionException.java | 10 +- .../hadoop/hbase/client/TestBufferedMutator.java | 2 +- .../hadoop/hbase/client/TestClientScanner.java | 8 +- .../hbase/client/TestReversedScannerCallable.java | 3 +- ...nTestTimeBoundedRequestsWithRegionReplicas.java | 13 +- .../org/apache/hadoop/hbase/util/HBaseFsck.java | 34 ++--- .../apache/hadoop/hbase/util/RegionSplitter.java | 19 ++- .../hbase/client/HConnectionTestingUtility.java | 105 +++++++-------- .../apache/hadoop/hbase/client/TestCISleep.java | 4 +- .../hbase/client/TestHBaseAdminNoCluster.java | 7 +- .../TestMetaTableAccessorNoCluster.java | 47 ++++--- .../hadoop/hbase/client/TestMetaWithReplicas.java | 34 ++--- .../hbase/client/TestReplicaWithCluster.java | 49 ++++--- .../hadoop/hbase/client/TestReplicasClient.java | 11 +- .../hbase/client/TestSeparateClientZKCluster.java | 4 +- .../client/TestSnapshotCloneIndependence.java | 2 +- .../master/TestMetaAssignmentWithStopMaster.java | 48 +++---- .../TestLoadIncrementalHFilesSplitRecovery.java | 27 ++-- .../hadoop/hbase/util/BaseTestHBaseFsck.java | 3 +- .../hadoop/hbase/util/MultiThreadedAction.java | 23 ++-- .../hadoop/hbase/util/MultiThreadedReader.java | 11 +- .../hadoop/hbase/util/MultiThreadedWriterBase.java | 8 +- 39 files changed, 431 insertions(+), 549 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index de7449b..b0f863f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -136,7 +136,7 @@ class AsyncProcess { // TODO: many of the fields should be made private final long id; - final ClusterConnection connection; + final ConnectionImplementation connection; private final RpcRetryingCallerFactory rpcCallerFactory; final RpcControllerFactory rpcFactory; @@ -161,7 +161,7 @@ class AsyncProcess { public static final String LOG_DETAILS_PERIOD = "hbase.client.log.detail.period.ms"; private static final int DEFAULT_LOG_DETAILS_PERIOD = 10000; private final int periodToLog; - AsyncProcess(ClusterConnection hc, Configuration conf, + AsyncProcess(ConnectionImplementation hc, Configuration conf, RpcRetryingCallerFactory rpcCaller, RpcControllerFactory rpcFactory) { if (hc == null) { throw new IllegalArgumentException("ClusterConnection cannot be null."); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java index f0c8da4..922611b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java @@ -140,7 +140,8 @@ public class BufferedMutatorImpl implements BufferedMutator { params.getOperationTimeout() : conn.getConnectionConfiguration().getOperationTimeout()); this.ap = ap; } - BufferedMutatorImpl(ClusterConnection conn, RpcRetryingCallerFactory rpcCallerFactory, + + BufferedMutatorImpl(ConnectionImplementation conn, RpcRetryingCallerFactory rpcCallerFactory, RpcControllerFactory rpcFactory, BufferedMutatorParams params) { this(conn, params, // puts need to track errors globally due to how the APIs currently work. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java index e5af871..1a9178c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java @@ -67,11 +67,11 @@ public class ClientAsyncPrefetchScanner extends ClientSimpleScanner { private final Condition notFull = lock.newCondition(); public ClientAsyncPrefetchScanner(Configuration configuration, Scan scan, TableName name, - ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory, + ConnectionImplementation connection, RpcRetryingCallerFactory rpcCallerFactory, RpcControllerFactory rpcControllerFactory, ExecutorService pool, int replicaCallTimeoutMicroSecondScan) throws IOException { super(configuration, scan, name, connection, rpcCallerFactory, rpcControllerFactory, pool, - replicaCallTimeoutMicroSecondScan); + replicaCallTimeoutMicroSecondScan); } @VisibleForTesting diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index cae98aa..bbc3fb4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -68,7 +68,7 @@ public abstract class ClientScanner extends AbstractClientScanner { // Keep lastResult returned successfully in case we have to reset scanner. protected Result lastResult = null; protected final long maxScannerResultSize; - private final ClusterConnection connection; + private final ConnectionImplementation connection; protected final TableName tableName; protected final int scannerTimeout; protected RpcRetryingCaller caller; @@ -93,7 +93,7 @@ public abstract class ClientScanner extends AbstractClientScanner { * @throws IOException */ public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName, - ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, + ConnectionImplementation connection, RpcRetryingCallerFactory rpcFactory, RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout) throws IOException { if (LOG.isTraceEnabled()) { @@ -137,7 +137,7 @@ public abstract class ClientScanner extends AbstractClientScanner { initCache(); } - protected ClusterConnection getConnection() { + protected ConnectionImplementation getConnection() { return this.connection; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientServiceCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientServiceCallable.java index f118e7a..67ba838 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientServiceCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientServiceCallable.java @@ -34,9 +34,9 @@ import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; public abstract class ClientServiceCallable extends RegionServerCallable { - public ClientServiceCallable(Connection connection, TableName tableName, byte [] row, + public ClientServiceCallable(Connection connection, TableName tableName, byte[] row, RpcController rpcController, int priority) { - super(connection, tableName, row, rpcController, priority); + super((ConnectionImplementation) connection, tableName, row, rpcController, priority); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java index 7e9c4b9..e5d7b97 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java @@ -37,11 +37,11 @@ import org.apache.hadoop.hbase.ipc.RpcControllerFactory; @InterfaceAudience.Private public class ClientSimpleScanner extends ClientScanner { public ClientSimpleScanner(Configuration configuration, Scan scan, TableName name, - ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory, + ConnectionImplementation connection, RpcRetryingCallerFactory rpcCallerFactory, RpcControllerFactory rpcControllerFactory, ExecutorService pool, int replicaCallTimeoutMicroSecondScan) throws IOException { super(configuration, scan, name, connection, rpcCallerFactory, rpcControllerFactory, pool, - replicaCallTimeoutMicroSecondScan); + replicaCallTimeoutMicroSecondScan); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java index 9b2222b..7828ef0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java @@ -20,11 +20,8 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; -import java.util.List; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.MasterNotRunningException; -import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ZooKeeperConnectionException; @@ -95,135 +92,6 @@ public interface ClusterConnection extends Connection { TableState getTableState(TableName tableName) throws IOException; /** - * Find the location of the region of tableName that row - * lives in. - * @param tableName name of the table row is in - * @param row row key you're trying to find the region of - * @return HRegionLocation that describes where to find the region in - * question - * @throws IOException if a remote or network exception occurs - */ - HRegionLocation locateRegion(final TableName tableName, - final byte [] row) throws IOException; - - /** - * @deprecated {@link #clearRegionLocationCache()} instead. - */ - @Deprecated - default void clearRegionCache() { - clearRegionLocationCache(); - } - - void cacheLocation(final TableName tableName, final RegionLocations location); - - /** - * Allows flushing the region cache of all locations that pertain to - * tableName - * @param tableName Name of the table whose regions we are to remove from - * cache. - */ - void clearRegionCache(final TableName tableName); - - /** - * Deletes cached locations for the specific region. - * @param location The location object for the region, to be purged from cache. - */ - void deleteCachedRegionLocation(final HRegionLocation location); - - /** - * Find the location of the region of tableName that row - * lives in, ignoring any value that might be in the cache. - * @param tableName name of the table row is in - * @param row row key you're trying to find the region of - * @return HRegionLocation that describes where to find the region in - * question - * @throws IOException if a remote or network exception occurs - */ - HRegionLocation relocateRegion(final TableName tableName, - final byte [] row) throws IOException; - - /** - * Find the location of the region of tableName that row - * lives in, ignoring any value that might be in the cache. - * @param tableName name of the table row is in - * @param row row key you're trying to find the region of - * @param replicaId the replicaId of the region - * @return RegionLocations that describe where to find the region in - * question - * @throws IOException if a remote or network exception occurs - */ - RegionLocations relocateRegion(final TableName tableName, - final byte [] row, int replicaId) throws IOException; - - /** - * Update the location cache. This is used internally by HBase, in most cases it should not be - * used by the client application. - * @param tableName the table name - * @param regionName the region name - * @param rowkey the row - * @param exception the exception if any. Can be null. - * @param source the previous location - */ - void updateCachedLocations(TableName tableName, byte[] regionName, byte[] rowkey, - Object exception, ServerName source); - - /** - * Gets the location of the region of regionName. - * @param regionName name of the region to locate - * @return HRegionLocation that describes where to find the region in - * question - * @throws IOException if a remote or network exception occurs - */ - HRegionLocation locateRegion(final byte[] regionName) - throws IOException; - - /** - * Gets the locations of all regions in the specified table, tableName. - * @param tableName table to get regions of - * @return list of region locations for all regions of table - * @throws IOException if IO failure occurs - */ - List locateRegions(final TableName tableName) throws IOException; - - /** - * Gets the locations of all regions in the specified table, tableName. - * @param tableName table to get regions of - * @param useCache Should we use the cache to retrieve the region information. - * @param offlined True if we are to include offlined regions, false and we'll leave out offlined - * regions from returned list. - * @return list of region locations for all regions of table - * @throws IOException if IO failure occurs - */ - List locateRegions(final TableName tableName, - final boolean useCache, - final boolean offlined) throws IOException; - - /** - * - * @param tableName table to get regions of - * @param row the row - * @param useCache Should we use the cache to retrieve the region information. - * @param retry do we retry - * @return region locations for this row. - * @throws IOException if IO failure occurs - */ - RegionLocations locateRegion(TableName tableName, - byte[] row, boolean useCache, boolean retry) throws IOException; - - /** - * - * @param tableName table to get regions of - * @param row the row - * @param useCache Should we use the cache to retrieve the region information. - * @param retry do we retry - * @param replicaId the replicaId for the region - * @return region locations for this row. - * @throws IOException if IO failure occurs - */ - RegionLocations locateRegion(TableName tableName, byte[] row, boolean useCache, boolean retry, - int replicaId) throws IOException; - - /** * Returns a {@link MasterKeepAliveConnection} to the active master */ MasterKeepAliveConnection getMaster() throws IOException; @@ -253,23 +121,6 @@ public interface ClusterConnection extends Connection { ClientService.BlockingInterface getClient(final ServerName serverName) throws IOException; /** - * Find region location hosting passed row - * @param tableName table name - * @param row Row to find. - * @param reload If true do not use cache, otherwise bypass. - * @return Location of row. - * @throws IOException if a remote or network exception occurs - */ - HRegionLocation getRegionLocation(TableName tableName, byte[] row, boolean reload) - throws IOException; - - /** - * Clear any caches that pertain to server name sn. - * @param sn A server name - */ - void clearCaches(final ServerName sn); - - /** * @return Nonce generator for this ClusterConnection; may be null if disabled in configuration. */ NonceGenerator getNonceGenerator(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index ff2ff2f..f262020 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -611,9 +611,16 @@ class ConnectionImplementation implements ClusterConnection, Closeable { return true; } - @Override - public HRegionLocation getRegionLocation(final TableName tableName, final byte[] row, - boolean reload) throws IOException { + /** + * Find region location hosting passed row + * @param tableName table name + * @param row Row to find. + * @param reload If true do not use cache, otherwise bypass. + * @return Location of row. + * @throws IOException if a remote or network exception occurs + */ + HRegionLocation getRegionLocation(final TableName tableName, final byte[] row, boolean reload) + throws IOException { return reload ? relocateRegion(tableName, row) : locateRegion(tableName, row); } @@ -683,13 +690,6 @@ class ConnectionImplementation implements ClusterConnection, Closeable { } } - @Override - public HRegionLocation locateRegion(final byte[] regionName) throws IOException { - RegionLocations locations = locateRegion(RegionInfo.getTable(regionName), - RegionInfo.getStartKey(regionName), false, true); - return locations == null ? null : locations.getRegionLocation(); - } - private boolean isDeadServer(ServerName sn) { if (clusterStatusListener == null) { return false; @@ -698,13 +698,26 @@ class ConnectionImplementation implements ClusterConnection, Closeable { } } - @Override - public List locateRegions(TableName tableName) throws IOException { + /** + * Gets the locations of all regions in the specified table, tableName. + * @param tableName table to get regions of + * @return list of region locations for all regions of table + * @throws IOException if IO failure occurs + */ + List locateRegions(TableName tableName) throws IOException { return locateRegions(tableName, false, true); } - @Override - public List locateRegions(TableName tableName, boolean useCache, + /** + * Gets the locations of all regions in the specified table, tableName. + * @param tableName table to get regions of + * @param useCache Should we use the cache to retrieve the region information. + * @param offlined True if we are to include offlined regions, false and we'll leave out offlined + * regions from returned list. + * @return list of region locations for all regions of table + * @throws IOException if IO failure occurs + */ + List locateRegions(TableName tableName, boolean useCache, boolean offlined) throws IOException { List regions; if (TableName.isMetaTableName(tableName)) { @@ -729,24 +742,44 @@ class ConnectionImplementation implements ClusterConnection, Closeable { return locations; } - @Override - public HRegionLocation locateRegion(final TableName tableName, final byte[] row) - throws IOException { + /** + * Find the location of the region of tableName that row lives in. + * @param tableName name of the table row is in + * @param row row key you're trying to find the region of + * @return HRegionLocation that describes where to find the region in question + * @throws IOException if a remote or network exception occurs + */ + HRegionLocation locateRegion(final TableName tableName, final byte[] row) throws IOException { RegionLocations locations = locateRegion(tableName, row, true, true); return locations == null ? null : locations.getRegionLocation(); } - @Override - public HRegionLocation relocateRegion(final TableName tableName, final byte[] row) - throws IOException { + /** + * Find the location of the region of tableName that row lives in, ignoring any + * value that might be in the cache. + * @param tableName name of the table row is in + * @param row row key you're trying to find the region of + * @return HRegionLocation that describes where to find the region in question + * @throws IOException if a remote or network exception occurs + */ + HRegionLocation relocateRegion(final TableName tableName, final byte[] row) throws IOException { RegionLocations locations = relocateRegion(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID); return locations == null ? null : locations.getRegionLocation(RegionReplicaUtil.DEFAULT_REPLICA_ID); } - @Override - public RegionLocations relocateRegion(final TableName tableName, + /** + * Find the location of the region of tableName that row + * lives in, ignoring any value that might be in the cache. + * @param tableName name of the table row is in + * @param row row key you're trying to find the region of + * @param replicaId the replicaId of the region + * @return RegionLocations that describe where to find the region in + * question + * @throws IOException if a remote or network exception occurs + */ + RegionLocations relocateRegion(final TableName tableName, final byte [] row, int replicaId) throws IOException{ // Since this is an explicit request not to use any caching, finding // disabled tables should not be desirable. This will ensure that an exception is thrown when @@ -758,14 +791,30 @@ class ConnectionImplementation implements ClusterConnection, Closeable { return locateRegion(tableName, row, false, true, replicaId); } - @Override - public RegionLocations locateRegion(final TableName tableName, final byte[] row, boolean useCache, + /** + * @param tableName table to get regions of + * @param row the row + * @param useCache Should we use the cache to retrieve the region information. + * @param retry do we retry + * @return region locations for this row. + * @throws IOException if IO failure occurs + */ + RegionLocations locateRegion(final TableName tableName, final byte[] row, boolean useCache, boolean retry) throws IOException { return locateRegion(tableName, row, useCache, retry, RegionReplicaUtil.DEFAULT_REPLICA_ID); } - @Override - public RegionLocations locateRegion(final TableName tableName, final byte[] row, boolean useCache, + /** + * + * @param tableName table to get regions of + * @param row the row + * @param useCache Should we use the cache to retrieve the region information. + * @param retry do we retry + * @param replicaId the replicaId for the region + * @return region locations for this row. + * @throws IOException if IO failure occurs + */ + RegionLocations locateRegion(final TableName tableName, final byte[] row, boolean useCache, boolean retry, int replicaId) throws IOException { checkClosed(); if (tableName == null || tableName.getName().length == 0) { @@ -969,8 +1018,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable { * @param tableName The table name. * @param location the new location */ - @Override - public void cacheLocation(final TableName tableName, final RegionLocations location) { + void cacheLocation(final TableName tableName, final RegionLocations location) { metaCache.cacheLocation(tableName, location); } @@ -984,15 +1032,15 @@ class ConnectionImplementation implements ClusterConnection, Closeable { return metaCache.getCachedLocation(tableName, row); } - public void clearRegionCache(final TableName tableName, byte[] row) { + void clearRegionCache(final TableName tableName, byte[] row) { metaCache.clearCache(tableName, row); } - /* - * Delete all cached entries of a table that maps to a specific location. + /** + * Clear any caches that pertain to server name sn. + * @param sn A server name */ - @Override - public void clearCaches(final ServerName serverName) { + void clearCaches(final ServerName serverName) { metaCache.clearCache(serverName); } @@ -1001,8 +1049,11 @@ class ConnectionImplementation implements ClusterConnection, Closeable { metaCache.clearCache(); } - @Override - public void clearRegionCache(final TableName tableName) { + /** + * Allows flushing the region cache of all locations that pertain to tableName + * @param tableName Name of the table whose regions we are to remove from cache. + */ + void clearRegionCache(final TableName tableName) { metaCache.clearCache(tableName); } @@ -1853,8 +1904,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable { cacheLocation(hri.getTable(), source, newHrl); } - @Override - public void deleteCachedRegionLocation(final HRegionLocation location) { + void deleteCachedRegionLocation(final HRegionLocation location) { metaCache.clearCache(location); } @@ -1866,8 +1916,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable { * or wrapped or both RegionMovedException * @param source server that is the source of the location update. */ - @Override - public void updateCachedLocations(final TableName tableName, byte[] regionName, byte[] rowkey, + void updateCachedLocations(final TableName tableName, byte[] regionName, byte[] rowkey, final Object exception, final ServerName source) { if (rowkey == null || tableName == null) { LOG.warn("Coding error, see method javadoc. row=" + (rowkey == null ? "null" : rowkey) + diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FlushRegionCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FlushRegionCallable.java index bb265a4..d881fe0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FlushRegionCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FlushRegionCallable.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; - import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; @@ -28,6 +27,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; @@ -42,7 +42,7 @@ public class FlushRegionCallable extends RegionAdminServiceCallable implements RetryingCallable< protected final RpcControllerFactory rpcControllerFactory; private HBaseRpcController controller = null; - protected final ClusterConnection connection; + protected final ConnectionImplementation connection; protected HRegionLocation location; protected final TableName tableName; protected final byte[] row; protected final int replicaId; - public RegionAdminServiceCallable(ClusterConnection connection, + public RegionAdminServiceCallable(ConnectionImplementation connection, RpcControllerFactory rpcControllerFactory, TableName tableName, byte[] row) { this(connection, rpcControllerFactory, null, tableName, row); } - public RegionAdminServiceCallable(ClusterConnection connection, + public RegionAdminServiceCallable(ConnectionImplementation connection, RpcControllerFactory rpcControllerFactory, HRegionLocation location, TableName tableName, byte[] row) { this(connection, rpcControllerFactory, location, tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID); } - public RegionAdminServiceCallable(ClusterConnection connection, + public RegionAdminServiceCallable(ConnectionImplementation connection, RpcControllerFactory rpcControllerFactory, HRegionLocation location, TableName tableName, byte[] row, int replicaId) { this.connection = connection; @@ -138,8 +138,8 @@ public abstract class RegionAdminServiceCallable implements RetryingCallable< return ConnectionUtils.getPauseTime(pause, tries); } - public static RegionLocations getRegionLocations( - ClusterConnection connection, TableName tableName, byte[] row, + private static RegionLocations getRegionLocations( + ConnectionImplementation connection, TableName tableName, byte[] row, boolean useCache, int replicaId) throws RetriesExhaustedException, DoNotRetryIOException, InterruptedIOException { RegionLocations rl; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java index f709c44..264304e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java @@ -52,7 +52,7 @@ import org.apache.hadoop.hbase.util.Bytes; // Public but should be package private only it is used by MetaTableAccessor. FIX!! @InterfaceAudience.Private public abstract class RegionServerCallable implements RetryingCallable { - private final Connection connection; + private final ConnectionImplementation connection; private final TableName tableName; private final byte[] row; /** @@ -75,12 +75,12 @@ public abstract class RegionServerCallable implements RetryingCallable * @param tableName Table name to which row belongs. * @param row The row we want in tableName. */ - public RegionServerCallable(Connection connection, TableName tableName, byte [] row, + public RegionServerCallable(ConnectionImplementation connection, TableName tableName, byte [] row, RpcController rpcController) { this(connection, tableName, row, rpcController, HConstants.NORMAL_QOS); } - public RegionServerCallable(Connection connection, TableName tableName, byte [] row, + public RegionServerCallable(ConnectionImplementation connection, TableName tableName, byte [] row, RpcController rpcController, int priority) { super(); this.connection = connection; @@ -162,8 +162,8 @@ public abstract class RegionServerCallable implements RetryingCallable /** * @return {@link ClusterConnection} instance used by this Callable. */ - protected ClusterConnection getConnection() { - return (ClusterConnection) this.connection; + protected ConnectionImplementation getConnection() { + return this.connection; } protected HRegionLocation getLocation() { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java index 53b9641..34c24c0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java @@ -46,11 +46,11 @@ public class ReversedClientScanner extends ClientScanner { * @throws IOException */ public ReversedClientScanner(Configuration conf, Scan scan, TableName tableName, - ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, + ConnectionImplementation connection, RpcRetryingCallerFactory rpcFactory, RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout) throws IOException { super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool, - primaryOperationTimeout); + primaryOperationTimeout); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java index 4a31cff..b2d0600 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.HConstants.PRIORITY_UNSET; + import java.io.IOException; import java.io.InterruptedIOException; import java.util.Collections; @@ -27,24 +29,22 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hbase.ipc.HBaseRpcController; -import org.apache.hadoop.hbase.ipc.RpcControllerFactory; + import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; - -import static org.apache.hadoop.hbase.HConstants.PRIORITY_UNSET; /** * Caller that goes to replica if the primary region does no answer within a configurable @@ -58,7 +58,7 @@ public class RpcRetryingCallerWithReadReplicas { LoggerFactory.getLogger(RpcRetryingCallerWithReadReplicas.class); protected final ExecutorService pool; - protected final ClusterConnection cConnection; + protected final ConnectionImplementation cConnection; protected final Configuration conf; protected final Get get; protected final TableName tableName; @@ -71,7 +71,7 @@ public class RpcRetryingCallerWithReadReplicas { public RpcRetryingCallerWithReadReplicas( RpcControllerFactory rpcControllerFactory, TableName tableName, - ClusterConnection cConnection, final Get get, + ConnectionImplementation cConnection, final Get get, ExecutorService pool, int retries, int operationTimeout, int rpcTimeout, int timeBeforeReplicas) { this.rpcControllerFactory = rpcControllerFactory; @@ -185,19 +185,14 @@ public class RpcRetryingCallerWithReadReplicas { } else { // We cannot get the primary replica location, it is possible that the region // server hosting meta is down, it needs to proceed to try cached replicas. - if (cConnection instanceof ConnectionImplementation) { - rl = ((ConnectionImplementation)cConnection).getCachedLocation(tableName, get.getRow()); - if (rl == null) { - // No cached locations - throw e; - } - - // Primary replica location is not known, skip primary replica - skipPrimary = true; - } else { - // For completeness + rl = cConnection.getCachedLocation(tableName, get.getRow()); + if (rl == null) { + // No cached locations throw e; } + + // Primary replica location is not known, skip primary replica + skipPrimary = true; } } @@ -316,9 +311,8 @@ public class RpcRetryingCallerWithReadReplicas { } static RegionLocations getRegionLocations(boolean useCache, int replicaId, - ClusterConnection cConnection, TableName tableName, byte[] row) + ConnectionImplementation cConnection, TableName tableName, byte[] row) throws RetriesExhaustedException, DoNotRetryIOException, InterruptedIOException { - RegionLocations rl; try { if (useCache) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java index bcb81f7..27e5f87 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java @@ -18,8 +18,6 @@ package org.apache.hadoop.hbase.client; -import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; - import java.io.IOException; import java.io.InterruptedIOException; import java.util.HashSet; @@ -31,17 +29,18 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ScannerCallable.MoreResults; +import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hbase.client.ScannerCallable.MoreResults; -import org.apache.hadoop.hbase.util.Pair; + +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; /** * This class has the logic for handling scanners for regions with and without replicas. @@ -60,7 +59,7 @@ class ScannerCallableWithReplicas implements RetryingCallable { private static final Logger LOG = LoggerFactory.getLogger(ScannerCallableWithReplicas.class); volatile ScannerCallable currentScannerCallable; AtomicBoolean replicaSwitched = new AtomicBoolean(false); - final ClusterConnection cConnection; + final ConnectionImplementation cConnection; protected final ExecutorService pool; protected final int timeBeforeReplicas; private final Scan scan; @@ -74,7 +73,7 @@ class ScannerCallableWithReplicas implements RetryingCallable { private boolean someRPCcancelled = false; //required for testing purposes only private int regionReplication = 0; - public ScannerCallableWithReplicas(TableName tableName, ClusterConnection cConnection, + public ScannerCallableWithReplicas(TableName tableName, ConnectionImplementation cConnection, ScannerCallable baseCallable, ExecutorService pool, int timeBeforeReplicas, Scan scan, int retries, int scannerTimeout, int caching, Configuration conf, RpcRetryingCaller caller) { @@ -151,19 +150,13 @@ class ScannerCallableWithReplicas implements RetryingCallable { RegionLocations rl = null; try { rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true, - RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName, - currentScannerCallable.getRow()); + RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName, + currentScannerCallable.getRow()); } catch (RetriesExhaustedException | DoNotRetryIOException e) { // We cannot get the primary replica region location, it is possible that the region server // hosting meta table is down, it needs to proceed to try cached replicas directly. - if (cConnection instanceof ConnectionImplementation) { - rl = ((ConnectionImplementation) cConnection) - .getCachedLocation(tableName, currentScannerCallable.getRow()); - if (rl == null) { - throw e; - } - } else { - // For completeness + rl = cConnection.getCachedLocation(tableName, currentScannerCallable.getRow()); + if (rl == null) { throw e; } } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index 71b21ac..70d10ca 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -175,17 +175,17 @@ public class TestAsyncProcess { return r; } - public MyAsyncProcess(ClusterConnection hc, Configuration conf) { - super(hc, conf, - new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf)); + public MyAsyncProcess(ConnectionImplementation hc, Configuration conf) { + super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf)); service = Executors.newFixedThreadPool(5); this.conf = conf; } - public MyAsyncProcess(ClusterConnection hc, Configuration conf, AtomicInteger nbThreads) { + public MyAsyncProcess(ConnectionImplementation hc, Configuration conf, + AtomicInteger nbThreads) { super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf)); - service = new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, - new SynchronousQueue<>(), new CountingThreadFactory(nbThreads)); + service = new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, new SynchronousQueue<>(), + new CountingThreadFactory(nbThreads)); } public AsyncRequestFuture submit(ExecutorService pool, TableName tableName, @@ -326,7 +326,8 @@ public class TestAsyncProcess { private final IOException ioe; - public AsyncProcessWithFailure(ClusterConnection hc, Configuration conf, IOException ioe) { + public AsyncProcessWithFailure(ConnectionImplementation hc, Configuration conf, + IOException ioe) { super(hc, conf); this.ioe = ioe; serverTrackerTimeout = 1L; @@ -376,7 +377,7 @@ public class TestAsyncProcess { customPrimarySleepMs.put(server, primaryMs); } - public MyAsyncProcessWithReplicas(ClusterConnection hc, Configuration conf) { + public MyAsyncProcessWithReplicas(ConnectionImplementation hc, Configuration conf) { super(hc, conf); } @@ -622,7 +623,7 @@ public class TestAsyncProcess { } private void doSubmitRequest(long maxHeapSizePerRequest, long putsHeapSize) throws Exception { - ClusterConnection conn = createHConnection(); + ConnectionImplementation conn = createConnectionImpl(); final String defaultClazz = conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY); final long defaultHeapSizePerRequest = conn.getConfiguration().getLong( @@ -718,7 +719,7 @@ public class TestAsyncProcess { @Test public void testSubmit() throws Exception { - ClusterConnection hc = createHConnection(); + ConnectionImplementation hc = createConnectionImpl(); MyAsyncProcess ap = new MyAsyncProcess(hc, CONF); List puts = new ArrayList<>(1); @@ -730,7 +731,7 @@ public class TestAsyncProcess { @Test public void testSubmitWithCB() throws Exception { - ClusterConnection hc = createHConnection(); + ConnectionImplementation hc = createConnectionImpl(); final AtomicInteger updateCalled = new AtomicInteger(0); Batch.Callback cb = new Batch.Callback() { @Override @@ -751,7 +752,7 @@ public class TestAsyncProcess { @Test public void testSubmitBusyRegion() throws Exception { - ClusterConnection conn = createHConnection(); + ConnectionImplementation conn = createConnectionImpl(); final String defaultClazz = conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY); conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, @@ -779,7 +780,7 @@ public class TestAsyncProcess { @Test public void testSubmitBusyRegionServer() throws Exception { - ClusterConnection conn = createHConnection(); + ConnectionImplementation conn = createConnectionImpl(); MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); final String defaultClazz = conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY); @@ -810,7 +811,7 @@ public class TestAsyncProcess { @Test public void testFail() throws Exception { - MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF); + MyAsyncProcess ap = new MyAsyncProcess(createConnectionImpl(), CONF); List puts = new ArrayList<>(1); Put p = createPut(1, false); @@ -836,7 +837,7 @@ public class TestAsyncProcess { @Test public void testSubmitTrue() throws IOException { - ClusterConnection conn = createHConnection(); + ConnectionImplementation conn = createConnectionImpl(); final MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); final String defaultClazz = conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY); @@ -885,7 +886,7 @@ public class TestAsyncProcess { @Test public void testFailAndSuccess() throws Exception { - MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF); + MyAsyncProcess ap = new MyAsyncProcess(createConnectionImpl(), CONF); List puts = new ArrayList<>(3); puts.add(createPut(1, false)); @@ -912,7 +913,7 @@ public class TestAsyncProcess { @Test public void testFlush() throws Exception { - MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF); + MyAsyncProcess ap = new MyAsyncProcess(createConnectionImpl(), CONF); List puts = new ArrayList<>(3); puts.add(createPut(1, false)); @@ -929,7 +930,7 @@ public class TestAsyncProcess { @Test public void testTaskCountWithoutClientBackoffPolicy() throws IOException, InterruptedException { - ClusterConnection hc = createHConnection(); + ConnectionImplementation hc = createConnectionImpl(); MyAsyncProcess ap = new MyAsyncProcess(hc, CONF); testTaskCount(ap); } @@ -939,7 +940,7 @@ public class TestAsyncProcess { Configuration copyConf = new Configuration(CONF); copyConf.setBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, true); MyClientBackoffPolicy bp = new MyClientBackoffPolicy(); - ClusterConnection conn = createHConnection(); + ConnectionImplementation conn = createConnectionImpl(); Mockito.when(conn.getConfiguration()).thenReturn(copyConf); Mockito.when(conn.getStatisticsTracker()).thenReturn(ServerStatisticTracker.create(copyConf)); Mockito.when(conn.getBackoffPolicy()).thenReturn(bp); @@ -979,7 +980,7 @@ public class TestAsyncProcess { @Test public void testMaxTask() throws Exception { - ClusterConnection conn = createHConnection(); + ConnectionImplementation conn = createConnectionImpl(); final String defaultClazz = conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY); conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, @@ -1038,8 +1039,8 @@ public class TestAsyncProcess { } } - private ClusterConnection createHConnection() throws IOException { - ClusterConnection hc = createHConnectionCommon(); + private ConnectionImplementation createConnectionImpl() throws IOException { + ConnectionImplementation hc = createConnectionImplCommon(); setMockLocation(hc, DUMMY_BYTES_1, new RegionLocations(loc1)); setMockLocation(hc, DUMMY_BYTES_2, new RegionLocations(loc2)); setMockLocation(hc, DUMMY_BYTES_3, new RegionLocations(loc3)); @@ -1049,8 +1050,8 @@ public class TestAsyncProcess { return hc; } - private ClusterConnection createHConnectionWithReplicas() throws IOException { - ClusterConnection hc = createHConnectionCommon(); + private ConnectionImplementation createConnectionImplWithReplicas() throws IOException { + ConnectionImplementation hc = createConnectionImplCommon(); setMockLocation(hc, DUMMY_BYTES_1, hrls1); setMockLocation(hc, DUMMY_BYTES_2, hrls2); setMockLocation(hc, DUMMY_BYTES_3, hrls3); @@ -1069,16 +1070,16 @@ public class TestAsyncProcess { return hc; } - private static void setMockLocation(ClusterConnection hc, byte[] row, + private static void setMockLocation(ConnectionImplementation hc, byte[] row, RegionLocations result) throws IOException { - Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row), - Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyInt())).thenReturn(result); - Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row), - Mockito.anyBoolean(), Mockito.anyBoolean())).thenReturn(result); + Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row), Mockito.anyBoolean(), + Mockito.anyBoolean(), Mockito.anyInt())).thenReturn(result); + Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row), Mockito.anyBoolean(), + Mockito.anyBoolean())).thenReturn(result); } - private ClusterConnection createHConnectionCommon() { - ClusterConnection hc = Mockito.mock(ClusterConnection.class); + private ConnectionImplementation createConnectionImplCommon() { + ConnectionImplementation hc = Mockito.mock(ConnectionImplementation.class); NonceGenerator ng = Mockito.mock(NonceGenerator.class); Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE); Mockito.when(hc.getNonceGenerator()).thenReturn(ng); @@ -1089,7 +1090,7 @@ public class TestAsyncProcess { @Test public void testHTablePutSuccess() throws Exception { - ClusterConnection conn = createHConnection(); + ConnectionImplementation conn = createConnectionImpl(); MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); BufferedMutatorImpl ht = new BufferedMutatorImpl(conn, bufferParam, ap); @@ -1106,7 +1107,7 @@ public class TestAsyncProcess { @Test public void testSettingWriteBufferPeriodicFlushParameters() throws Exception { - ClusterConnection conn = createHConnection(); + ConnectionImplementation conn = createConnectionImpl(); MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); checkPeriodicFlushParameters(conn, ap, @@ -1152,7 +1153,7 @@ public class TestAsyncProcess { @Test public void testWriteBufferPeriodicFlushTimeoutMs() throws Exception { - ClusterConnection conn = createHConnection(); + ConnectionImplementation conn = createConnectionImpl(); MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); @@ -1219,7 +1220,7 @@ public class TestAsyncProcess { @Test public void testBufferedMutatorImplWithSharedPool() throws Exception { - ClusterConnection conn = createHConnection(); + ConnectionImplementation conn = createConnectionImpl(); MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); BufferedMutator ht = new BufferedMutatorImpl(conn, bufferParam, ap); @@ -1230,7 +1231,7 @@ public class TestAsyncProcess { @Test public void testFailedPutAndNewPut() throws Exception { - ClusterConnection conn = createHConnection(); + ConnectionImplementation conn = createConnectionImpl(); MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE) .writeBufferSize(0); @@ -1275,7 +1276,7 @@ public class TestAsyncProcess { @Test public void testBatch() throws IOException, InterruptedException { - ClusterConnection conn = new MyConnectionImpl(CONF); + ConnectionImplementation conn = new MyConnectionImpl(CONF); HTable ht = (HTable) conn.getTable(DUMMY_TABLE); ht.multiAp = new MyAsyncProcess(conn, CONF); @@ -1306,7 +1307,7 @@ public class TestAsyncProcess { @Test public void testErrorsServers() throws IOException { Configuration configuration = new Configuration(CONF); - ClusterConnection conn = new MyConnectionImpl(configuration); + ConnectionImplementation conn = new MyConnectionImpl(configuration); MyAsyncProcess ap = new MyAsyncProcess(conn, configuration); BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap); @@ -1337,7 +1338,7 @@ public class TestAsyncProcess { Configuration copyConf = new Configuration(CONF); copyConf.setLong(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, readTimeout); copyConf.setLong(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, writeTimeout); - ClusterConnection conn = new MyConnectionImpl(copyConf); + ConnectionImplementation conn = new MyConnectionImpl(copyConf); MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf); try (HTable ht = (HTable) conn.getTable(DUMMY_TABLE)) { ht.multiAp = ap; @@ -1370,7 +1371,7 @@ public class TestAsyncProcess { @Test public void testErrors() throws IOException { - ClusterConnection conn = new MyConnectionImpl(CONF); + ConnectionImplementation conn = new MyConnectionImpl(CONF); AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, CONF, new IOException("test")); BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap); @@ -1394,7 +1395,7 @@ public class TestAsyncProcess { @Test public void testCallQueueTooLarge() throws IOException { - ClusterConnection conn = new MyConnectionImpl(CONF); + ConnectionImplementation conn = new MyConnectionImpl(CONF); AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, CONF, new CallQueueTooBigException()); BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); @@ -1609,7 +1610,7 @@ public class TestAsyncProcess { // TODO: this is kind of timing dependent... perhaps it should detect from createCaller // that the replica call has happened and that way control the ordering. Configuration conf = new Configuration(); - ClusterConnection conn = createHConnectionWithReplicas(); + ConnectionImplementation conn = createConnectionImplWithReplicas(); conf.setInt(AsyncProcess.PRIMARY_CALL_TIMEOUT_KEY, replicaAfterMs * 1000); if (retries >= 0) { conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries); @@ -1707,16 +1708,15 @@ public class TestAsyncProcess { } static class AsyncProcessForThrowableCheck extends AsyncProcess { - public AsyncProcessForThrowableCheck(ClusterConnection hc, Configuration conf) { - super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory( - conf)); + public AsyncProcessForThrowableCheck(ConnectionImplementation hc, Configuration conf) { + super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf)); } } @Test public void testUncheckedException() throws Exception { // Test the case pool.submit throws unchecked exception - ClusterConnection hc = createHConnection(); + ConnectionImplementation hc = createConnectionImpl(); MyThreadPoolExecutor myPool = new MyThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(200)); @@ -1748,7 +1748,7 @@ public class TestAsyncProcess { final int retries = 1; myConf.setLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, specialPause); myConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries); - ClusterConnection conn = new MyConnectionImpl(myConf); + ConnectionImplementation conn = new MyConnectionImpl(myConf); AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, myConf, new CallQueueTooBigException()); BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); @@ -1807,7 +1807,7 @@ public class TestAsyncProcess { @Test public void testRetryWithExceptionClearsMetaCache() throws Exception { - ClusterConnection conn = createHConnection(); + ConnectionImplementation conn = createConnectionImpl(); Configuration myConf = conn.getConfiguration(); myConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0); @@ -1840,7 +1840,7 @@ public class TestAsyncProcess { @Test public void testQueueRowAccess() throws Exception { - ClusterConnection conn = createHConnection(); + ConnectionImplementation conn = createConnectionImpl(); BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, null, null, new BufferedMutatorParams(DUMMY_TABLE).writeBufferSize(100000)); Put p0 = new Put(DUMMY_BYTES_1).addColumn(DUMMY_BYTES_1, DUMMY_BYTES_1, DUMMY_BYTES_1); diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcessWithRegionException.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcessWithRegionException.java index ffc4e51..2c24aaa 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcessWithRegionException.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcessWithRegionException.java @@ -175,8 +175,8 @@ public class TestAsyncProcessWithRegionException { } } - private static ClusterConnection createHConnection() throws IOException { - ClusterConnection hc = Mockito.mock(ClusterConnection.class); + private static ConnectionImplementation createHConnection() throws IOException { + ConnectionImplementation hc = Mockito.mock(ConnectionImplementation.class); NonceGenerator ng = Mockito.mock(NonceGenerator.class); Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE); Mockito.when(hc.getNonceGenerator()).thenReturn(ng); @@ -190,8 +190,8 @@ public class TestAsyncProcessWithRegionException { return hc; } - private static void setMockLocation(ClusterConnection hc, byte[] row, RegionLocations result) - throws IOException { + private static void setMockLocation(ConnectionImplementation hc, byte[] row, + RegionLocations result) throws IOException { Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyInt())).thenReturn(result); Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row), Mockito.anyBoolean(), @@ -201,7 +201,7 @@ public class TestAsyncProcessWithRegionException { private static class MyAsyncProcess extends AsyncProcess { private final ExecutorService service = Executors.newFixedThreadPool(5); - MyAsyncProcess(ClusterConnection hc, Configuration conf) { + MyAsyncProcess(ConnectionImplementation hc, Configuration conf) { super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf)); } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator.java index f8e1295..f0375e2 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator.java @@ -48,7 +48,7 @@ public class TestBufferedMutator { * Just to prove that I can insert a BM other than default. */ public static class MyBufferedMutator extends BufferedMutatorImpl { - MyBufferedMutator(ClusterConnection conn, RpcRetryingCallerFactory rpcCallerFactory, + MyBufferedMutator(ConnectionImplementation conn, RpcRetryingCallerFactory rpcCallerFactory, RpcControllerFactory rpcFactory, BufferedMutatorParams params) { super(conn, rpcCallerFactory, rpcFactory, params); } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java index 48ca751..9f1f6f3 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java @@ -71,7 +71,7 @@ public class TestClientScanner { ExecutorService pool; Configuration conf; - ClusterConnection clusterConn; + ConnectionImplementation clusterConn; RpcRetryingCallerFactory rpcFactory; RpcControllerFactory controllerFactory; @@ -80,7 +80,7 @@ public class TestClientScanner { @Before public void setup() throws IOException { - clusterConn = Mockito.mock(ClusterConnection.class); + clusterConn = Mockito.mock(ConnectionImplementation.class); rpcFactory = Mockito.mock(RpcRetryingCallerFactory.class); controllerFactory = Mockito.mock(RpcControllerFactory.class); pool = Executors.newSingleThreadExecutor(); @@ -103,11 +103,11 @@ public class TestClientScanner { private boolean initialized = false; public MockClientScanner(final Configuration conf, final Scan scan, final TableName tableName, - ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, + ConnectionImplementation connection, RpcRetryingCallerFactory rpcFactory, RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout) throws IOException { super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool, - primaryOperationTimeout); + primaryOperationTimeout); } @Override diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestReversedScannerCallable.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestReversedScannerCallable.java index 1b554f7..7eb2b94 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestReversedScannerCallable.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestReversedScannerCallable.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.client; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; @@ -46,7 +45,7 @@ public class TestReversedScannerCallable { HBaseClassTestRule.forClass(TestReversedScannerCallable.class); @Mock - private ClusterConnection connection; + private ConnectionImplementation connection; @Mock private Scan scan; @Mock diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestTimeBoundedRequestsWithRegionReplicas.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestTimeBoundedRequestsWithRegionReplicas.java index b5e99d2..b9cb167 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestTimeBoundedRequestsWithRegionReplicas.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestTimeBoundedRequestsWithRegionReplicas.java @@ -31,13 +31,12 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.IntegrationTestIngest; import org.apache.hadoop.hbase.IntegrationTestingUtility; -import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.chaos.factories.MonkeyFactory; import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.Consistency; import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore; @@ -52,6 +51,7 @@ import org.junit.Assert; import org.junit.experimental.categories.Category; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.com.google.common.collect.Lists; /** @@ -350,10 +350,11 @@ public class IntegrationTestTimeBoundedRequestsWithRegionReplicas extends Integr numReadFailures.addAndGet(1); // fail the test for (Result r : results) { LOG.error("FAILED FOR " + r); - RegionLocations rl = ((ClusterConnection)connection). - locateRegion(tableName, r.getRow(), true, true); - HRegionLocation locations[] = rl.getRegionLocations(); - for (HRegionLocation h : locations) { + List locs; + try (RegionLocator locator = connection.getRegionLocator(tableName)) { + locs = locator.getRegionLocations(r.getRow()); + } + for (HRegionLocation h : locs) { LOG.error("LOCATION " + h); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java index 9e5f9e8..34c846d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java @@ -99,6 +99,7 @@ import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -1948,35 +1949,34 @@ public class HBaseFsck extends Configured implements Closeable { * Record the location of the hbase:meta region as found in ZooKeeper. */ private boolean recordMetaRegion() throws IOException { - RegionLocations rl = connection.locateRegion(TableName.META_TABLE_NAME, - HConstants.EMPTY_START_ROW, false, false); - if (rl == null) { - errors.reportError(ERROR_CODE.NULL_META_REGION, - "META region was not found in ZooKeeper"); + List locs; + try (RegionLocator locator = connection.getRegionLocator(TableName.META_TABLE_NAME)) { + locs = locator.getRegionLocations(HConstants.EMPTY_START_ROW, true); + } + if (locs == null || locs.isEmpty()) { + errors.reportError(ERROR_CODE.NULL_META_REGION, "META region was not found in ZooKeeper"); return false; } - for (HRegionLocation metaLocation : rl.getRegionLocations()) { + for (HRegionLocation metaLocation : locs) { // Check if Meta region is valid and existing - if (metaLocation == null ) { - errors.reportError(ERROR_CODE.NULL_META_REGION, - "META region location is null"); + if (metaLocation == null) { + errors.reportError(ERROR_CODE.NULL_META_REGION, "META region location is null"); return false; } - if (metaLocation.getRegionInfo() == null) { - errors.reportError(ERROR_CODE.NULL_META_REGION, - "META location regionInfo is null"); + if (metaLocation.getRegion() == null) { + errors.reportError(ERROR_CODE.NULL_META_REGION, "META location regionInfo is null"); return false; } if (metaLocation.getHostname() == null) { - errors.reportError(ERROR_CODE.NULL_META_REGION, - "META location hostName is null"); + errors.reportError(ERROR_CODE.NULL_META_REGION, "META location hostName is null"); return false; } ServerName sn = metaLocation.getServerName(); - MetaEntry m = new MetaEntry(metaLocation.getRegionInfo(), sn, EnvironmentEdgeManager.currentTime()); - HbckInfo hbckInfo = regionInfoMap.get(metaLocation.getRegionInfo().getEncodedName()); + MetaEntry m = + new MetaEntry(metaLocation.getRegion(), sn, EnvironmentEdgeManager.currentTime()); + HbckInfo hbckInfo = regionInfoMap.get(metaLocation.getRegion().getEncodedName()); if (hbckInfo == null) { - regionInfoMap.put(metaLocation.getRegionInfo().getEncodedName(), new HbckInfo(m)); + regionInfoMap.put(metaLocation.getRegion().getEncodedName(), new HbckInfo(m)); } else { hbckInfo.metaEntry = m; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java index 1b58634..a779d36 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java @@ -44,22 +44,21 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; -import org.apache.hadoop.hbase.client.TableDescriptor; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.NoServerForRegionException; import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; -import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; import org.apache.hbase.thirdparty.com.google.common.collect.Maps; @@ -707,7 +706,7 @@ public class RegionSplitter { Path tableDir = tableDirAndSplitFile.getFirst(); FileSystem fs = tableDir.getFileSystem(connection.getConfiguration()); // Clear the cache to forcibly refresh region information - ((ClusterConnection)connection).clearRegionCache(); + connection.clearRegionLocationCache(); TableDescriptor htd = null; try (Table table = connection.getTable(tableName)) { htd = table.getDescriptor(); @@ -768,7 +767,7 @@ public class RegionSplitter { } catch (NoServerForRegionException nsfre) { LOG.debug("No Server Exception thrown for: " + splitAlgo.rowToStr(start)); physicalSplitting.add(region); - ((ClusterConnection)connection).clearRegionCache(); + connection.clearRegionLocationCache(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java index a8beab6..2a5a395 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java @@ -20,44 +20,43 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; - import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.RegionLocations; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ZooKeeperConnectionException; -import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; -import org.apache.hadoop.hbase.coprocessor.RegionObserver; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; -import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.util.Threads; import org.mockito.Mockito; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; + /** - * {@link ClusterConnection} testing utility. + * {@link ConnectionImplementation} testing utility. */ public class HConnectionTestingUtility { /* - * Not part of {@link HBaseTestingUtility} because this class is not - * in same package as {@link ClusterConnection}. Would have to reveal ugly - * {@link ConnectionImplementation} innards to HBaseTestingUtility to give it access. + * Not part of {@link HBaseTestingUtility} because this class is not in same package as {@link + * ConnectionImplementation}. Would have to reveal ugly {@link ConnectionImplementation} innards + * to HBaseTestingUtility to give it access. */ /** - * Get a Mocked {@link ClusterConnection} that goes with the passed conf - * configuration instance. Minimally the mock will return - * <code>conf</conf> when {@link ClusterConnection#getConfiguration()} is invoked. - * Be sure to shutdown the connection when done by calling - * {@link Connection#close()} else it will stick around; this is probably not what you want. + * Get a Mocked {@link ConnectionImplementation} that goes with the passed conf + * configuration instance. Minimally the mock will return <code>conf</conf> when + * {@link ConnectionImplementation#getConfiguration()} is invoked. Be sure to shutdown the + * connection when done by calling {@link Connection#close()} else it will stick around; this is + * probably not what you want. * @param conf configuration - * @return ClusterConnection object for conf + * @return ConnectionImplementation object for conf * @throws ZooKeeperConnectionException */ - public static ClusterConnection getMockedConnection(final Configuration conf) + public static ConnectionImplementation getMockedConnection(final Configuration conf) throws ZooKeeperConnectionException { ConnectionImplementation connection = Mockito.mock(ConnectionImplementation.class); Mockito.when(connection.getConfiguration()).thenReturn(conf); @@ -70,37 +69,30 @@ public class HConnectionTestingUtility { } /** - * Calls {@link #getMockedConnection(Configuration)} and then mocks a few - * more of the popular {@link ClusterConnection} methods so they do 'normal' - * operation (see return doc below for list). Be sure to shutdown the - * connection when done by calling {@link Connection#close()} else it will stick around; - * this is probably not what you want. - * + * Calls {@link #getMockedConnection(Configuration)} and then mocks a few more of the popular + * {@link ConnectionImplementation} methods so they do 'normal' operation (see return doc below + * for list). Be sure to shutdown the connection when done by calling {@link Connection#close()} + * else it will stick around; this is probably not what you want. * @param conf Configuration to use - * @param admin An AdminProtocol; can be null but is usually - * itself a mock. - * @param client A ClientProtocol; can be null but is usually - * itself a mock. - * @param sn ServerName to include in the region location returned by this - * connection - * @param hri RegionInfo to include in the location returned when - * getRegionLocator is called on the mocked connection + * @param admin An AdminProtocol; can be null but is usually itself a mock. + * @param client A ClientProtocol; can be null but is usually itself a mock. + * @param sn ServerName to include in the region location returned by this connection + * @param hri RegionInfo to include in the location returned when getRegionLocator is called on + * the mocked connection * @return Mock up a connection that returns a {@link Configuration} when - * {@link ClusterConnection#getConfiguration()} is called, a 'location' when - * {@link ClusterConnection#getRegionLocation(org.apache.hadoop.hbase.TableName, byte[], boolean)} - * is called, - * and that returns the passed {@link AdminProtos.AdminService.BlockingInterface} instance when - * {@link ClusterConnection#getAdmin(ServerName)} is called, returns the passed - * {@link ClientProtos.ClientService.BlockingInterface} instance when - * {@link ClusterConnection#getClient(ServerName)} is called (Be sure to call - * {@link Connection#close()} when done with this mocked Connection. - * @throws IOException + * {@link ConnectionImplementation#getConfiguration()} is called, a 'location' when + * {@link ConnectionImplementation#getRegionLocation(TableName,byte[], boolean)} + * is called, and that returns the passed + * {@link AdminProtos.AdminService.BlockingInterface} instance when + * {@link ConnectionImplementation#getAdmin(ServerName)} is called, returns the passed + * {@link ClientProtos.ClientService.BlockingInterface} instance when + * {@link ConnectionImplementation#getClient(ServerName)} is called (Be sure to call + * {@link Connection#close()} when done with this mocked Connection. */ - public static ClusterConnection getMockedConnectionAndDecorate(final Configuration conf, + public static ConnectionImplementation getMockedConnectionAndDecorate(final Configuration conf, final AdminProtos.AdminService.BlockingInterface admin, - final ClientProtos.ClientService.BlockingInterface client, - final ServerName sn, final RegionInfo hri) - throws IOException { + final ClientProtos.ClientService.BlockingInterface client, final ServerName sn, + final RegionInfo hri) throws IOException { ConnectionImplementation c = Mockito.mock(ConnectionImplementation.class); Mockito.when(c.getConfiguration()).thenReturn(conf); Mockito.doNothing().when(c).close(); @@ -141,18 +133,17 @@ public class HConnectionTestingUtility { } /** - * Get a Mockito spied-upon {@link ClusterConnection} that goes with the passed - * conf configuration instance. - * Be sure to shutdown the connection when done by calling - * {@link Connection#close()} else it will stick around; this is probably not what you want. + * Get a Mockito spied-upon {@link ConnectionImplementation} that goes with the passed + * conf configuration instance. Be sure to shutdown the connection when done by + * calling {@link Connection#close()} else it will stick around; this is probably not what you + * want. * @param conf configuration - * @return ClusterConnection object for conf - * @throws ZooKeeperConnectionException - * [Dead link]: See also - * {http://mockito.googlecode.com/svn/branches/1.6/javadoc/org/mockito/Mockito.html#spy(T)} + * @return ConnectionImplementation object for conf + * @throws ZooKeeperConnectionException [Dead link]: See also + * {http://mockito.googlecode.com/svn/branches/1.6/javadoc/org/mockito/Mockito.html#spy(T)} */ - public static ClusterConnection getSpiedConnection(final Configuration conf) - throws IOException { + public static ConnectionImplementation getSpiedConnection(final Configuration conf) + throws IOException { ConnectionImplementation connection = Mockito.spy(new ConnectionImplementation(conf, null, null)); return connection; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCISleep.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCISleep.java index 4e5665d..cd27a30 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCISleep.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCISleep.java @@ -111,8 +111,8 @@ public class TestCISleep extends AbstractTestCITimeout { } RegionAdminServiceCallable regionAdminServiceCallable = - new RegionAdminServiceCallable((ClusterConnection) TEST_UTIL.getConnection(), - new RpcControllerFactory(TEST_UTIL.getConfiguration()), tableName, FAM_NAM) { + new RegionAdminServiceCallable((ConnectionImplementation) TEST_UTIL.getConnection(), + new RpcControllerFactory(TEST_UTIL.getConfiguration()), tableName, FAM_NAM) { @Override public Object call(HBaseRpcController controller) throws Exception { return null; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHBaseAdminNoCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHBaseAdminNoCluster.java index a1026a9..ad73592 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHBaseAdminNoCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHBaseAdminNoCluster.java @@ -95,8 +95,9 @@ public class TestHBaseAdminNoCluster { configuration.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, count); // Get mocked connection. Getting the connection will register it so when HBaseAdmin is // constructed with same configuration, it will find this mocked connection. - ClusterConnection connection = HConnectionTestingUtility.getMockedConnection(configuration); - // Mock so we get back the master interface. Make it so when createTable is called, we throw + ConnectionImplementation connection = + HConnectionTestingUtility.getMockedConnection(configuration); + // Mock so we get back the master interface. Make it so when createTable is called, we throw // the PleaseHoldException. MasterKeepAliveConnection masterAdmin = Mockito.mock(MasterKeepAliveConnection.class); Mockito.when(masterAdmin.createTable((RpcController)Mockito.any(), @@ -292,7 +293,7 @@ public class TestHBaseAdminNoCluster { final int count = 10; configuration.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, count); - ClusterConnection connection = mock(ClusterConnection.class); + ConnectionImplementation connection = mock(ConnectionImplementation.class); when(connection.getConfiguration()).thenReturn(configuration); MasterKeepAliveConnection masterAdmin = Mockito.mock(MasterKeepAliveConnection.class, new Answer() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessorNoCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaTableAccessorNoCluster.java similarity index 86% rename from hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessorNoCluster.java rename to hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaTableAccessorNoCluster.java index 5d36ea9..53f5064 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessorNoCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaTableAccessorNoCluster.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hbase; +package org.apache.hadoop.hbase.client; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -26,11 +26,19 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.NavigableMap; -import org.apache.hadoop.hbase.client.ClusterConnection; -import org.apache.hadoop.hbase.client.HConnectionTestingUtility; -import org.apache.hadoop.hbase.client.RegionInfo; -import org.apache.hadoop.hbase.client.RegionInfoBuilder; -import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScannable; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.RegionLocations; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MiscTests; @@ -112,8 +120,8 @@ public class TestMetaTableAccessorNoCluster { assertTrue(hri == null); // OK, give it what it expects kvs.clear(); - kvs.add(new KeyValue(HConstants.EMPTY_BYTE_ARRAY, f, - HConstants.REGIONINFO_QUALIFIER, RegionInfo.toByteArray(RegionInfoBuilder.FIRST_META_REGIONINFO))); + kvs.add(new KeyValue(HConstants.EMPTY_BYTE_ARRAY, f, HConstants.REGIONINFO_QUALIFIER, + RegionInfo.toByteArray(RegionInfoBuilder.FIRST_META_REGIONINFO))); hri = MetaTableAccessor.getRegionInfo(Result.create(kvs)); assertNotNull(hri); assertTrue(RegionInfo.COMPARATOR.compare(hri, RegionInfoBuilder.FIRST_META_REGIONINFO) == 0); @@ -123,8 +131,6 @@ public class TestMetaTableAccessorNoCluster { * Test that MetaTableAccessor will ride over server throwing * "Server not running" IOEs. * @see HBASE-3446 - * @throws IOException - * @throws InterruptedException */ @Test public void testRideOverServerNotRunning() @@ -135,7 +141,7 @@ public class TestMetaTableAccessorNoCluster { // This is a servername we use in a few places below. ServerName sn = ServerName.valueOf("example.com", 1234, System.currentTimeMillis()); - ClusterConnection connection = null; + ConnectionImplementation connection = null; try { // Mock an ClientProtocol. Our mock implementation will fail a few // times when we go to open a scanner. @@ -190,26 +196,27 @@ public class TestMetaTableAccessorNoCluster { // Return the RegionLocations object when locateRegion // The ugly format below comes of 'Important gotcha on spying real objects!' from // http://mockito.googlecode.com/svn/branches/1.6/javadoc/org/mockito/Mockito.html - Mockito.doReturn(rl).when - (connection).locateRegion((TableName)Mockito.any(), (byte[])Mockito.any(), - Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyInt()); + Mockito.doReturn(rl).when(connection).locateRegion((TableName) Mockito.any(), + (byte[]) Mockito.any(), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyInt()); // Now shove our HRI implementation into the spied-upon connection. - Mockito.doReturn(implementation). - when(connection).getClient(Mockito.any()); + Mockito.doReturn(implementation).when(connection).getClient(Mockito.any()); // Scan meta for user tables and verify we got back expected answer. NavigableMap hris = MetaTableAccessor.getServerUserRegions(connection, sn); assertEquals(1, hris.size()); - assertTrue(RegionInfo.COMPARATOR.compare(hris.firstEntry().getKey(), RegionInfoBuilder.FIRST_META_REGIONINFO) == 0); + assertTrue(RegionInfo.COMPARATOR.compare(hris.firstEntry().getKey(), + RegionInfoBuilder.FIRST_META_REGIONINFO) == 0); assertTrue(Bytes.equals(rowToVerify, hris.firstEntry().getValue().getRow())); // Finally verify that scan was called four times -- three times // with exception and then on 4th attempt we succeed - Mockito.verify(implementation, Mockito.times(4)). - scan((RpcController)Mockito.any(), (ScanRequest)Mockito.any()); + Mockito.verify(implementation, Mockito.times(4)).scan((RpcController) Mockito.any(), + (ScanRequest) Mockito.any()); } finally { - if (connection != null && !connection.isClosed()) connection.close(); + if (connection != null && !connection.isClosed()) { + connection.close(); + } zkw.close(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaWithReplicas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaWithReplicas.java index 6c633a2..e5280a6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaWithReplicas.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaWithReplicas.java @@ -253,7 +253,7 @@ public class TestMetaWithReplicas { util.getHBaseClusterInterface().killRegionServer(primary); util.getHBaseClusterInterface().waitForRegionServerToStop(primary, 60000); } - ((ClusterConnection)c).clearRegionCache(); + c.clearRegionLocationCache(); } LOG.info("Running GETs"); Get get = null; @@ -276,7 +276,7 @@ public class TestMetaWithReplicas { util.getHBaseClusterInterface().startRegionServer(primary.getHostname(), 0); util.getHBaseClusterInterface().waitForActiveAndReadyMaster(); LOG.info("Master active!"); - ((ClusterConnection)c).clearRegionCache(); + c.clearRegionLocationCache(); } conf.setBoolean(HConstants.USE_META_REPLICAS, false); LOG.info("Running GETs no replicas"); @@ -352,19 +352,24 @@ public class TestMetaWithReplicas { }; } + private List getMetaRegionLocations() throws IOException { + try (RegionLocator locator = + TEST_UTIL.getConnection().getRegionLocator(TableName.META_TABLE_NAME)) { + return locator.getAllRegionLocations(); + } + } + @Nullable private String checkMetaLocationAndExplain(int originalReplicaCount) throws KeeperException, IOException { List metaZnodes = TEST_UTIL.getZooKeeperWatcher().getMetaReplicaNodes(); if (metaZnodes.size() == originalReplicaCount) { - RegionLocations rl = ((ClusterConnection) TEST_UTIL.getConnection()) - .locateRegion(TableName.META_TABLE_NAME, - HConstants.EMPTY_START_ROW, false, false); - for (HRegionLocation location : rl.getRegionLocations()) { + List locs = getMetaRegionLocations(); + for (HRegionLocation location : locs) { if (location == null) { - return "Null location found in " + rl.toString(); + return "Null location found in " + locs; } - if (location.getRegionInfo() == null) { + if (location.getRegion() == null) { return "Null regionInfo for location " + location; } if (location.getHostname() == null) { @@ -387,8 +392,7 @@ public class TestMetaWithReplicas { public void testHBaseFsckWithFewerMetaReplicas() throws Exception { ClusterConnection c = (ClusterConnection)ConnectionFactory.createConnection( TEST_UTIL.getConfiguration()); - RegionLocations rl = c.locateRegion(TableName.META_TABLE_NAME, HConstants.EMPTY_START_ROW, - false, false); + RegionLocations rl = new RegionLocations(getMetaRegionLocations()); HBaseFsckRepair.closeRegionSilentlyAndWait(c, rl.getRegionLocation(1).getServerName(), rl.getRegionLocation(1).getRegionInfo()); // check that problem exists @@ -405,8 +409,7 @@ public class TestMetaWithReplicas { public void testHBaseFsckWithFewerMetaReplicaZnodes() throws Exception { ClusterConnection c = (ClusterConnection)ConnectionFactory.createConnection( TEST_UTIL.getConfiguration()); - RegionLocations rl = c.locateRegion(TableName.META_TABLE_NAME, HConstants.EMPTY_START_ROW, - false, false); + RegionLocations rl = new RegionLocations(getMetaRegionLocations()); HBaseFsckRepair.closeRegionSilentlyAndWait(c, rl.getRegionLocation(2).getServerName(), rl.getRegionLocation(2).getRegionInfo()); ZKWatcher zkw = TEST_UTIL.getZooKeeperWatcher(); @@ -484,17 +487,14 @@ public class TestMetaWithReplicas { // can be recovered try (ClusterConnection conn = (ClusterConnection) ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())) { - RegionLocations rl = conn. - locateRegion(TableName.META_TABLE_NAME, Bytes.toBytes(""), false, true); - HRegionLocation hrl = rl.getRegionLocation(1); + HRegionLocation hrl = getMetaRegionLocations().get(1); ServerName oldServer = hrl.getServerName(); TEST_UTIL.getHBaseClusterInterface().killRegionServer(oldServer); int i = 0; do { LOG.debug("Waiting for the replica " + hrl.getRegionInfo() + " to come up"); Thread.sleep(10000); //wait for the detection/recovery - rl = conn.locateRegion(TableName.META_TABLE_NAME, Bytes.toBytes(""), false, true); - hrl = rl.getRegionLocation(1); + hrl = getMetaRegionLocations().get(1); i++; } while ((hrl == null || hrl.getServerName().equals(oldServer)) && i < 3); assertTrue(i != 3); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java index a42b26d..987ac7e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java @@ -670,9 +670,9 @@ public class TestReplicaWithCluster { // within configured hbase.client.metaReplicaCallTimeout.scan from primary meta region. @Test public void testGetRegionLocationFromPrimaryMetaRegion() throws IOException, InterruptedException { - HTU.getAdmin().setBalancerRunning(false, true); + HTU.getAdmin().balancerSwitch(false, true); - ((ConnectionImplementation) HTU.getAdmin().getConnection()).setUseMetaReplicas(true); + ((ConnectionImplementation) HTU.getConnection()).setUseMetaReplicas(true); // Create table then get the single region for our new table. HTableDescriptor hdt = HTU.createTableDescriptor("testGetRegionLocationFromPrimaryMetaRegion"); @@ -684,13 +684,13 @@ public class TestReplicaWithCluster { RegionServerHostingPrimayMetaRegionSlowOrStopCopro.slowDownPrimaryMetaScan = true; // Get user table location, always get it from the primary meta replica - RegionLocations url = ((ClusterConnection) HTU.getConnection()) - .locateRegion(hdt.getTableName(), row, false, false); - + try (RegionLocator locator = HTU.getConnection().getRegionLocator(hdt.getTableName())) { + locator.getRegionLocations(row, true); + } } finally { RegionServerHostingPrimayMetaRegionSlowOrStopCopro.slowDownPrimaryMetaScan = false; - ((ConnectionImplementation) HTU.getAdmin().getConnection()).setUseMetaReplicas(false); - HTU.getAdmin().setBalancerRunning(true, true); + ((ConnectionImplementation) HTU.getConnection()).setUseMetaReplicas(false); + HTU.getAdmin().balancerSwitch(true, true); HTU.getAdmin().disableTable(hdt.getTableName()); HTU.deleteTable(hdt.getTableName()); } @@ -703,25 +703,27 @@ public class TestReplicaWithCluster { // with the primary meta region. @Test public void testReplicaGetWithPrimaryAndMetaDown() throws IOException, InterruptedException { - HTU.getAdmin().setBalancerRunning(false, true); + HTU.getAdmin().balancerSwitch(false, true); - ((ConnectionImplementation)HTU.getAdmin().getConnection()).setUseMetaReplicas(true); + ((ConnectionImplementation)HTU.getConnection()).setUseMetaReplicas(true); // Create table then get the single region for our new table. HTableDescriptor hdt = HTU.createTableDescriptor("testReplicaGetWithPrimaryAndMetaDown"); hdt.setRegionReplication(2); try { - Table table = HTU.createTable(hdt, new byte[][] { f }, null); - // Get Meta location - RegionLocations mrl = ((ClusterConnection) HTU.getConnection()) - .locateRegion(TableName.META_TABLE_NAME, - HConstants.EMPTY_START_ROW, false, false); + RegionLocations mrl; + try ( + RegionLocator locator = HTU.getConnection().getRegionLocator(TableName.META_TABLE_NAME)) { + mrl = new RegionLocations(locator.getRegionLocations(HConstants.EMPTY_START_ROW, true)); + } // Get user table location - RegionLocations url = ((ClusterConnection) HTU.getConnection()) - .locateRegion(hdt.getTableName(), row, false, false); + RegionLocations url; + try (RegionLocator locator = HTU.getConnection().getRegionLocator(hdt.getTableName())) { + url = new RegionLocations(locator.getRegionLocations(row, true)); + } // Make sure that user primary region is co-hosted with the meta region if (!url.getDefaultRegionLocation().getServerName().equals( @@ -740,12 +742,15 @@ public class TestReplicaWithCluster { // Wait until the meta table is updated with new location info while (true) { - mrl = ((ClusterConnection) HTU.getConnection()) - .locateRegion(TableName.META_TABLE_NAME, HConstants.EMPTY_START_ROW, false, false); + try (RegionLocator locator = + HTU.getConnection().getRegionLocator(TableName.META_TABLE_NAME)) { + mrl = new RegionLocations(locator.getRegionLocations(HConstants.EMPTY_START_ROW, true)); + } // Get user table location - url = ((ClusterConnection) HTU.getConnection()) - .locateRegion(hdt.getTableName(), row, false, true); + try (RegionLocator locator = HTU.getConnection().getRegionLocator(hdt.getTableName())) { + url = new RegionLocations(locator.getRegionLocations(row, true)); + } LOG.info("meta locations " + mrl); LOG.info("table locations " + url); @@ -787,9 +792,9 @@ public class TestReplicaWithCluster { Assert.assertTrue(r.isStale()); } finally { - ((ConnectionImplementation)HTU.getAdmin().getConnection()).setUseMetaReplicas(false); + ((ConnectionImplementation)HTU.getConnection()).setUseMetaReplicas(false); RegionServerHostingPrimayMetaRegionSlowOrStopCopro.throwException = false; - HTU.getAdmin().setBalancerRunning(true, true); + HTU.getAdmin().balancerSwitch(true, true); HTU.getAdmin().disableTable(hdt.getTableName()); HTU.deleteTable(hdt.getTableName()); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java index 4a72410..6616b3b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java @@ -229,7 +229,7 @@ public class TestReplicasClient { @Before public void before() throws IOException { - ((ClusterConnection) HTU.getAdmin().getConnection()).clearRegionCache(); + HTU.getConnection().clearRegionLocationCache(); try { openRegion(hriPrimary); } catch (Exception ignored) { @@ -250,8 +250,7 @@ public class TestReplicasClient { closeRegion(hriPrimary); } catch (Exception ignored) { } - - ((ClusterConnection) HTU.getAdmin().getConnection()).clearRegionCache(); + HTU.getConnection().clearRegionLocationCache(); } private HRegionServer getRS() { @@ -329,17 +328,17 @@ public class TestReplicasClient { public void testLocations() throws Exception { byte[] b1 = Bytes.toBytes("testLocations"); openRegion(hriSecondary); - ClusterConnection hc = (ClusterConnection) HTU.getAdmin().getConnection(); + ConnectionImplementation hc = (ConnectionImplementation) HTU.getConnection(); try { - hc.clearRegionCache(); + hc.clearRegionLocationCache(); RegionLocations rl = hc.locateRegion(table.getName(), b1, false, false); Assert.assertEquals(2, rl.size()); rl = hc.locateRegion(table.getName(), b1, true, false); Assert.assertEquals(2, rl.size()); - hc.clearRegionCache(); + hc.clearRegionLocationCache(); rl = hc.locateRegion(table.getName(), b1, true, false); Assert.assertEquals(2, rl.size()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSeparateClientZKCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSeparateClientZKCluster.java index 026010d..93c6b4d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSeparateClientZKCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSeparateClientZKCluster.java @@ -204,7 +204,7 @@ public class TestSeparateClientZKCluster { public void testMetaMoveDuringClientZkClusterRestart() throws Exception { TableName tn = TableName.valueOf(name.getMethodName()); // create table - ClusterConnection conn = (ClusterConnection) TEST_UTIL.getConnection(); + Connection conn = TEST_UTIL.getConnection(); Admin admin = conn.getAdmin(); HTable table = (HTable) conn.getTable(tn); try { @@ -218,7 +218,7 @@ public class TestSeparateClientZKCluster { put.addColumn(family, qualifier, value); table.put(put); // invalid connection cache - conn.clearRegionCache(); + conn.clearRegionLocationCache(); // stop client zk cluster clientZkCluster.shutdown(); // stop current meta server and confirm the server shutdown process diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotCloneIndependence.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotCloneIndependence.java index 0562c90..2f95a90 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotCloneIndependence.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotCloneIndependence.java @@ -297,7 +297,7 @@ public class TestSnapshotCloneIndependence { */ private void runTestRegionOperationsIndependent() throws Exception { // Verify that region information is the same pre-split - ((ClusterConnection) UTIL.getConnection()).clearRegionCache(); + UTIL.getConnection().clearRegionLocationCache(); List originalTableHRegions = admin.getTableRegions(originalTableName); final int originalRegionCount = originalTableHRegions.size(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMetaAssignmentWithStopMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMetaAssignmentWithStopMaster.java index 446c3f9..0c2532e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMetaAssignmentWithStopMaster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMetaAssignmentWithStopMaster.java @@ -25,8 +25,9 @@ import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.StartMiniClusterOption; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -58,32 +59,33 @@ public class TestMetaAssignmentWithStopMaster { @Test public void testStopActiveMaster() throws Exception { - ClusterConnection conn = - (ClusterConnection) ConnectionFactory.createConnection(UTIL.getConfiguration()); - ServerName oldMetaServer = conn.locateRegions(TableName.META_TABLE_NAME).get(0).getServerName(); - ServerName oldMaster = UTIL.getMiniHBaseCluster().getMaster().getServerName(); + try (Connection conn = ConnectionFactory.createConnection(UTIL.getConfiguration()); + RegionLocator locator = conn.getRegionLocator(TableName.META_TABLE_NAME)) { + ServerName oldMetaServer = locator.getAllRegionLocations().get(0).getServerName(); + ServerName oldMaster = UTIL.getMiniHBaseCluster().getMaster().getServerName(); - UTIL.getMiniHBaseCluster().getMaster().stop("Stop master for test"); - long startTime = System.currentTimeMillis(); - while (UTIL.getMiniHBaseCluster().getMaster() == null || UTIL.getMiniHBaseCluster().getMaster() - .getServerName().equals(oldMaster)) { - LOG.info("Wait the standby master become active"); - Thread.sleep(3000); - if (System.currentTimeMillis() - startTime > WAIT_TIMEOUT) { - fail("Wait too long for standby master become active"); + UTIL.getMiniHBaseCluster().getMaster().stop("Stop master for test"); + long startTime = System.currentTimeMillis(); + while (UTIL.getMiniHBaseCluster().getMaster() == null || + UTIL.getMiniHBaseCluster().getMaster().getServerName().equals(oldMaster)) { + LOG.info("Wait the standby master become active"); + Thread.sleep(3000); + if (System.currentTimeMillis() - startTime > WAIT_TIMEOUT) { + fail("Wait too long for standby master become active"); + } } - } - startTime = System.currentTimeMillis(); - while (!UTIL.getMiniHBaseCluster().getMaster().isInitialized()) { - LOG.info("Wait the new active master to be initialized"); - Thread.sleep(3000); - if (System.currentTimeMillis() - startTime > WAIT_TIMEOUT) { - fail("Wait too long for the new active master to be initialized"); + startTime = System.currentTimeMillis(); + while (!UTIL.getMiniHBaseCluster().getMaster().isInitialized()) { + LOG.info("Wait the new active master to be initialized"); + Thread.sleep(3000); + if (System.currentTimeMillis() - startTime > WAIT_TIMEOUT) { + fail("Wait too long for the new active master to be initialized"); + } } - } - ServerName newMetaServer = conn.locateRegions(TableName.META_TABLE_NAME).get(0).getServerName(); - assertTrue("The new meta server " + newMetaServer + " should be same with" + + ServerName newMetaServer = locator.getAllRegionLocations().get(0).getServerName(); + assertTrue("The new meta server " + newMetaServer + " should be same with" + " the old meta server " + oldMetaServer, newMetaServer.equals(oldMetaServer)); + } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFilesSplitRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFilesSplitRecovery.java index a4b99a1..fcc1bb8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFilesSplitRecovery.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFilesSplitRecovery.java @@ -39,7 +39,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableExistsException; @@ -50,6 +49,7 @@ import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.HConnectionTestingUtility; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.client.RegionLocator; @@ -366,24 +366,15 @@ public class TestLoadIncrementalHFilesSplitRecovery { private ClusterConnection getMockedConnection(final Configuration conf) throws IOException, org.apache.hbase.thirdparty.com.google.protobuf.ServiceException { - ClusterConnection c = Mockito.mock(ClusterConnection.class); - Mockito.when(c.getConfiguration()).thenReturn(conf); - Mockito.doNothing().when(c).close(); - // Make it so we return a particular location when asked. - final HRegionLocation loc = new HRegionLocation(RegionInfoBuilder.FIRST_META_REGIONINFO, - ServerName.valueOf("example.org", 1234, 0)); - Mockito.when( - c.getRegionLocation((TableName) Mockito.any(), (byte[]) Mockito.any(), Mockito.anyBoolean())) - .thenReturn(loc); - Mockito.when(c.locateRegion((TableName) Mockito.any(), (byte[]) Mockito.any())).thenReturn(loc); - ClientProtos.ClientService.BlockingInterface hri = - Mockito.mock(ClientProtos.ClientService.BlockingInterface.class); + ServerName sn = ServerName.valueOf("example.org", 1234, 0); + RegionInfo hri = RegionInfoBuilder.FIRST_META_REGIONINFO; + ClientProtos.ClientService.BlockingInterface client = + Mockito.mock(ClientProtos.ClientService.BlockingInterface.class); Mockito - .when( - hri.bulkLoadHFile((RpcController) Mockito.any(), (BulkLoadHFileRequest) Mockito.any())) - .thenThrow(new ServiceException(new IOException("injecting bulk load error"))); - Mockito.when(c.getClient(Mockito.any())).thenReturn(hri); - return c; + .when( + client.bulkLoadHFile((RpcController) Mockito.any(), (BulkLoadHFileRequest) Mockito.any())) + .thenThrow(new ServiceException(new IOException("injecting bulk load error"))); + return HConnectionTestingUtility.getMockedConnectionAndDecorate(conf, null, client, sn, hri); } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/BaseTestHBaseFsck.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/BaseTestHBaseFsck.java index fc23d51..d25ccef 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/BaseTestHBaseFsck.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/BaseTestHBaseFsck.java @@ -307,8 +307,7 @@ public class BaseTestHBaseFsck { tbl.close(); tbl = null; } - - ((ClusterConnection) connection).clearRegionCache(); + connection.clearRegionLocationCache(); deleteTable(TEST_UTIL, tablename); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java index 410dd0c..0a66ec0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java @@ -19,33 +19,34 @@ package org.apache.hadoop.hbase.util; import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.INCREMENT; import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.MUTATE_INFO; -import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; - import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HRegionLocation; -import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; import org.apache.hadoop.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; + /** * Common base class for reader and writer parts of multi-thread HBase load * test (See LoadTestTool). @@ -491,7 +492,6 @@ public abstract class MultiThreadedAction { } private void printLocations(Result r) { - RegionLocations rl = null; if (r == null) { LOG.info("FAILED FOR null Result"); return; @@ -500,15 +500,14 @@ public abstract class MultiThreadedAction { if (r.getRow() == null) { return; } - try { - rl = ((ClusterConnection)connection).locateRegion(tableName, r.getRow(), true, true); + try (RegionLocator locator = connection.getRegionLocator(tableName)) { + List locs = locator.getRegionLocations(r.getRow()); + for (HRegionLocation h : locs) { + LOG.info("LOCATION " + h); + } } catch (IOException e) { LOG.warn("Couldn't get locations for row " + Bytes.toString(r.getRow())); } - HRegionLocation locations[] = rl.getRegionLocations(); - for (HRegionLocation h : locations) { - LOG.info("LOCATION " + h); - } } private String resultToString(Result result) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java index 6864366..27f5fb9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java @@ -21,14 +21,13 @@ import java.util.Arrays; import java.util.HashSet; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; - import org.apache.commons.lang3.RandomUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Get; - import org.apache.hadoop.hbase.client.Consistency; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; @@ -377,8 +376,10 @@ public class MultiThreadedReader extends MultiThreadedAction numKeysVerified.incrementAndGet(); } } else { - HRegionLocation hloc = connection.getRegionLocation(tableName, - get.getRow(), false); + HRegionLocation hloc; + try (RegionLocator locator = connection.getRegionLocator(tableName)) { + hloc = locator.getRegionLocation(get.getRow()); + } String rowKey = Bytes.toString(get.getRow()); LOG.info("Key = " + rowKey + ", Region location: " + hloc); if(isNullExpected) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java index 54be0d3..1ebc9b1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java @@ -27,10 +27,10 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; import org.slf4j.Logger; @@ -97,9 +97,9 @@ public abstract class MultiThreadedWriterBase extends MultiThreadedAction { protected String getRegionDebugInfoSafe(Table table, byte[] rowKey) { HRegionLocation cached = null, real = null; - try { - cached = connection.getRegionLocation(tableName, rowKey, false); - real = connection.getRegionLocation(tableName, rowKey, true); + try (RegionLocator locator = connection.getRegionLocator(tableName)) { + cached = locator.getRegionLocation(rowKey, false); + real = locator.getRegionLocation(rowKey, true); } catch (Throwable t) { // Cannot obtain region information for another catch block - too bad! }