Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 847BE18A63 for ; Tue, 29 Mar 2016 00:56:19 +0000 (UTC) Received: (qmail 4719 invoked by uid 500); 29 Mar 2016 00:56:17 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 4605 invoked by uid 500); 29 Mar 2016 00:56:17 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 4192 invoked by uid 99); 29 Mar 2016 00:56:17 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 29 Mar 2016 00:56:17 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0554FE9770; Tue, 29 Mar 2016 00:56:17 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: enis@apache.org To: commits@hbase.apache.org Date: Tue, 29 Mar 2016 00:56:24 -0000 Message-Id: In-Reply-To: <2ed0a1a278a8474aaf725b792bad6e69@git.apache.org> References: <2ed0a1a278a8474aaf725b792bad6e69@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [09/12] hbase git commit: HBASE-15295 MutateTableAccess.multiMutate() does not get high priority causing a deadlock HBASE-15295 MutateTableAccess.multiMutate() does not get high priority causing a deadlock Conflicts: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CoprocessorRpcChannel.java hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestClockSkewDetection.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestNamespaceCommands.java Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/5202d3c2 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/5202d3c2 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/5202d3c2 Branch: refs/heads/branch-1 Commit: 5202d3c25b394cac3a00a5afc0693ad221fad9d6 Parents: 5851ad0 Author: Enis Soztutar Authored: Wed Mar 23 12:30:41 2016 -0700 Committer: Enis Soztutar Committed: Mon Mar 28 17:00:21 2016 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hbase/HRegionInfo.java | 1 + .../hbase/client/BufferedMutatorImpl.java | 2 +- .../hadoop/hbase/client/ClusterConnection.java | 20 +- .../hadoop/hbase/client/ConnectionAdapter.java | 16 + .../hbase/client/ConnectionConfiguration.java | 132 ++++++ .../hadoop/hbase/client/ConnectionManager.java | 27 +- .../apache/hadoop/hbase/client/HBaseAdmin.java | 426 ++++++++++++++----- .../org/apache/hadoop/hbase/client/HTable.java | 55 +-- .../hadoop/hbase/client/TableConfiguration.java | 132 ------ .../hadoop/hbase/ipc/AbstractRpcClient.java | 2 +- .../hadoop/hbase/ipc/CoprocessorRpcChannel.java | 9 +- .../hbase/ipc/MasterCoprocessorRpcChannel.java | 17 +- .../hbase/ipc/RegionCoprocessorRpcChannel.java | 44 +- .../ipc/RegionServerCoprocessorRpcChannel.java | 10 +- .../hadoop/hbase/protobuf/ProtobufUtil.java | 156 ++++--- .../security/access/AccessControlClient.java | 46 +- .../hbase/zookeeper/MetaTableLocator.java | 18 +- .../hbase/client/TestSnapshotFromAdmin.java | 31 +- .../hadoop/hbase/DistributedHBaseCluster.java | 2 +- .../hbase/tmpl/regionserver/RSStatusTmpl.jamon | 2 +- .../org/apache/hadoop/hbase/ipc/CallRunner.java | 5 +- .../apache/hadoop/hbase/ipc/RpcScheduler.java | 2 +- .../org/apache/hadoop/hbase/ipc/RpcServer.java | 12 +- .../hbase/master/RegionPlacementMaintainer.java | 2 +- .../hadoop/hbase/master/ServerManager.java | 33 +- .../apache/hadoop/hbase/MiniHBaseCluster.java | 4 +- .../hadoop/hbase/TestGlobalMemStoreSize.java | 14 +- .../hadoop/hbase/TestMetaTableAccessor.java | 79 ++++ .../hadoop/hbase/TestMetaTableLocator.java | 8 +- .../hbase/client/HConnectionTestingUtility.java | 7 +- .../apache/hadoop/hbase/client/TestAdmin1.java | 4 +- .../hadoop/hbase/client/TestFromClientSide.java | 6 +- .../hbase/client/TestHBaseAdminNoCluster.java | 10 + .../client/TestScannersFromClientSide.java | 6 +- .../hbase/ipc/DelegatingRpcScheduler.java | 76 ++++ .../TestLoadIncrementalHFilesSplitRecovery.java | 5 +- .../hbase/master/TestClockSkewDetection.java | 3 + .../hadoop/hbase/master/TestMasterFailover.java | 12 +- .../master/TestZKBasedOpenCloseRegion.java | 2 +- .../hbase/security/access/SecureTestUtil.java | 12 +- .../security/access/TestAccessController.java | 20 +- .../security/access/TestNamespaceCommands.java | 13 +- 42 files changed, 1012 insertions(+), 471 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/5202d3c2/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java index 918f146..09ceeb9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java @@ -229,6 +229,7 @@ public class HRegionInfo implements Comparable { private TableName tableName = null; /** HRegionInfo for first meta region */ + // TODO: How come Meta regions still do not have encoded region names? Fix. public static final HRegionInfo FIRST_META_REGIONINFO = new HRegionInfo(1L, TableName.META_TABLE_NAME); http://git-wip-us.apache.org/repos/asf/hbase/blob/5202d3c2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java index 9ba2e13..6220cd6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java @@ -87,7 +87,7 @@ public class BufferedMutatorImpl implements BufferedMutator { this.pool = params.getPool(); this.listener = params.getListener(); - TableConfiguration tableConf = new TableConfiguration(conf); + ConnectionConfiguration tableConf = new ConnectionConfiguration(conf); this.writeBufferSize = params.getWriteBufferSize() != BufferedMutatorParams.UNSET ? params.getWriteBufferSize() : tableConf.getWriteBufferSize(); this.maxKeyValueSize = params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET ? http://git-wip-us.apache.org/repos/asf/hbase/blob/5202d3c2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java index f23eb82..f4d464f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java @@ -30,11 +30,12 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService; -/** Internal methods on HConnection that should not be used by user code. */ +/** Internal methods on Connection that should not be used by user code. */ @InterfaceAudience.Private // NOTE: Although this class is public, this class is meant to be used directly from internal // classes and unit tests only. @@ -283,7 +284,22 @@ public interface ClusterConnection extends HConnection { * @return RpcRetryingCallerFactory */ RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf); - + + /** + * @return Connection's RpcRetryingCallerFactory instance + */ + RpcRetryingCallerFactory getRpcRetryingCallerFactory(); + + /** + * @return Connection's RpcControllerFactory instance + */ + RpcControllerFactory getRpcControllerFactory(); + + /** + * @return a ConnectionConfiguration object holding parsed configuration values + */ + ConnectionConfiguration getConnectionConfiguration(); + /** * * @return true if this is a managed connection. http://git-wip-us.apache.org/repos/asf/hbase/blob/5202d3c2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java index 7a18ea5..040fa6f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; /** * An internal class that delegates to an {@link HConnection} instance. @@ -474,4 +475,19 @@ abstract class ConnectionAdapter implements ClusterConnection { public boolean hasCellBlockSupport() { return wrappedConnection.hasCellBlockSupport(); } + + @Override + public ConnectionConfiguration getConnectionConfiguration() { + return wrappedConnection.getConnectionConfiguration(); + } + + @Override + public RpcRetryingCallerFactory getRpcRetryingCallerFactory() { + return wrappedConnection.getRpcRetryingCallerFactory(); + } + + @Override + public RpcControllerFactory getRpcControllerFactory() { + return wrappedConnection.getRpcControllerFactory(); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/5202d3c2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java new file mode 100644 index 0000000..de760d4 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Configuration parameters for the connection. + * Configuration is a heavy weight registry that does a lot of string operations and regex matching. + * Method calls into Configuration account for high CPU usage and have huge performance impact. + * This class caches connection-related configuration values in the ConnectionConfiguration + * object so that expensive conf.getXXX() calls are avoided every time HTable, etc is instantiated. + * see HBASE-12128 + */ +@InterfaceAudience.Private +public class ConnectionConfiguration { + + public static final String WRITE_BUFFER_SIZE_KEY = "hbase.client.write.buffer"; + public static final long WRITE_BUFFER_SIZE_DEFAULT = 2097152; + public static final String MAX_KEYVALUE_SIZE_KEY = "hbase.client.keyvalue.maxsize"; + public static final int MAX_KEYVALUE_SIZE_DEFAULT = -1; + + private final long writeBufferSize; + private final int metaOperationTimeout; + private final int operationTimeout; + private final int scannerCaching; + private final long scannerMaxResultSize; + private final int primaryCallTimeoutMicroSecond; + private final int replicaCallTimeoutMicroSecondScan; + private final int retries; + private final int maxKeyValueSize; + + /** + * Constructor + * @param conf Configuration object + */ + ConnectionConfiguration(Configuration conf) { + this.writeBufferSize = conf.getLong(WRITE_BUFFER_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT); + + this.metaOperationTimeout = conf.getInt( + HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, + HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); + + this.operationTimeout = conf.getInt( + HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); + + this.scannerCaching = conf.getInt( + HConstants.HBASE_CLIENT_SCANNER_CACHING, HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING); + + this.scannerMaxResultSize = + conf.getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, + HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); + + this.primaryCallTimeoutMicroSecond = + conf.getInt("hbase.client.primaryCallTimeout.get", 10000); // 10ms + + this.replicaCallTimeoutMicroSecondScan = + conf.getInt("hbase.client.replicaCallTimeout.scan", 1000000); // 1000 ms + + this.retries = conf.getInt( + HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); + + this.maxKeyValueSize = conf.getInt(MAX_KEYVALUE_SIZE_KEY, MAX_KEYVALUE_SIZE_DEFAULT); + } + + /** + * Constructor + * This is for internal testing purpose (using the default value). + * In real usage, we should read the configuration from the Configuration object. + */ + @VisibleForTesting + protected ConnectionConfiguration() { + this.writeBufferSize = WRITE_BUFFER_SIZE_DEFAULT; + this.metaOperationTimeout = HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT; + this.operationTimeout = HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT; + this.scannerCaching = HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING; + this.scannerMaxResultSize = HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE; + this.primaryCallTimeoutMicroSecond = 10000; + this.replicaCallTimeoutMicroSecondScan = 1000000; + this.retries = HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER; + this.maxKeyValueSize = MAX_KEYVALUE_SIZE_DEFAULT; + } + + public long getWriteBufferSize() { + return writeBufferSize; + } + + public int getMetaOperationTimeout() { + return metaOperationTimeout; + } + + public int getOperationTimeout() { + return operationTimeout; + } + + public int getScannerCaching() { + return scannerCaching; + } + + public int getPrimaryCallTimeoutMicroSecond() { + return primaryCallTimeoutMicroSecond; + } + + public int getReplicaCallTimeoutMicroSecondScan() { + return replicaCallTimeoutMicroSecondScan; + } + + public int getRetriesNumber() { + return retries; + } + + public int getMaxKeyValueSize() { + return maxKeyValueSize; + } + + public long getScannerMaxResultSize() { + return scannerMaxResultSize; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/5202d3c2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java index 2d55f6e..dc4a7c9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java @@ -595,7 +595,7 @@ class ConnectionManager { // cache the configuration value for tables so that we can avoid calling // the expensive Configuration to fetch the value multiple times. - private final TableConfiguration tableConfig; + private final ConnectionConfiguration connectionConfig; // Client rpc instance. private RpcClient rpcClient; @@ -644,13 +644,13 @@ class ConnectionManager { this.user = user; this.batchPool = pool; this.managed = managed; - this.tableConfig = new TableConfiguration(conf); + this.connectionConfig = new ConnectionConfiguration(conf); this.closed = false; this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE); this.useMetaReplicas = conf.getBoolean(HConstants.USE_META_REPLICAS, HConstants.DEFAULT_USE_META_REPLICAS); - this.numTries = tableConfig.getRetriesNumber(); + this.numTries = connectionConfig.getRetriesNumber(); this.rpcTimeout = conf.getInt( HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); @@ -746,7 +746,7 @@ class ConnectionManager { if (managed) { throw new NeedUnmanagedConnectionException(); } - return new HTable(tableName, this, tableConfig, rpcCallerFactory, rpcControllerFactory, pool); + return new HTable(tableName, this, connectionConfig, rpcCallerFactory, rpcControllerFactory, pool); } @Override @@ -758,10 +758,10 @@ class ConnectionManager { params.pool(HTable.getDefaultExecutor(getConfiguration())); } if (params.getWriteBufferSize() == BufferedMutatorParams.UNSET) { - params.writeBufferSize(tableConfig.getWriteBufferSize()); + params.writeBufferSize(connectionConfig.getWriteBufferSize()); } if (params.getMaxKeyValueSize() == BufferedMutatorParams.UNSET) { - params.maxKeyValueSize(tableConfig.getMaxKeyValueSize()); + params.maxKeyValueSize(connectionConfig.getMaxKeyValueSize()); } return new BufferedMutatorImpl(this, rpcCallerFactory, rpcControllerFactory, params); } @@ -2642,6 +2642,21 @@ class ConnectionManager { public boolean hasCellBlockSupport() { return this.rpcClient.hasCellBlockSupport(); } + + @Override + public ConnectionConfiguration getConnectionConfiguration() { + return this.connectionConfig; + } + + @Override + public RpcRetryingCallerFactory getRpcRetryingCallerFactory() { + return this.rpcCallerFactory; + } + + @Override + public RpcControllerFactory getRpcControllerFactory() { + return this.rpcControllerFactory; + } } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/5202d3c2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index 3eb7e7a..fe27a59 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -72,7 +72,9 @@ import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.exceptions.TimeoutIOException; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.ipc.MasterCoprocessorRpcChannel; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.RegionServerCoprocessorRpcChannel; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; @@ -214,6 +216,7 @@ public class HBaseAdmin implements Admin { private int operationTimeout; private RpcRetryingCallerFactory rpcCallerFactory; + private RpcControllerFactory rpcControllerFactory; private NonceGenerator ng; @@ -261,6 +264,7 @@ public class HBaseAdmin implements Admin { this.conf = connection.getConfiguration(); this.connection = connection; + // TODO: receive ConnectionConfiguration here rather than re-parsing these configs every time. this.pause = this.conf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE); this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, @@ -272,7 +276,8 @@ public class HBaseAdmin implements Admin { this.syncWaitTimeout = this.conf.getInt( "hbase.client.sync.wait.timeout.msec", 10 * 60000); // 10min - this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf); + this.rpcCallerFactory = connection.getRpcRetryingCallerFactory(); + this.rpcControllerFactory = connection.getRpcControllerFactory(); this.ng = this.connection.getNonceGenerator(); } @@ -330,17 +335,19 @@ public class HBaseAdmin implements Admin { */ @Override public Future abortProcedureAsync( - final long procId, - final boolean mayInterruptIfRunning) throws IOException { + final long procId, + final boolean mayInterruptIfRunning) throws IOException { Boolean abortProcResponse = executeCallable( new MasterCallable(getConnection()) { - @Override - public AbortProcedureResponse call(int callTimeout) throws ServiceException { - AbortProcedureRequest abortProcRequest = - AbortProcedureRequest.newBuilder().setProcId(procId).build(); - return master.abortProcedure(null,abortProcRequest); - } - }).getIsProcedureAborted(); + @Override + public AbortProcedureResponse call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + AbortProcedureRequest abortProcRequest = + AbortProcedureRequest.newBuilder().setProcId(procId).build(); + return master.abortProcedure(controller, abortProcRequest); + } + }).getIsProcedureAborted(); AbortProcedureFuture abortProcFuture = new AbortProcedureFuture(this, procId, abortProcResponse); @@ -428,9 +435,11 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable(getConnection()) { @Override public HTableDescriptor[] call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); GetTableDescriptorsRequest req = RequestConverter.buildGetTableDescriptorsRequest(pattern, includeSysTables); - return ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(null, req)); + return ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(controller, req)); } }); } @@ -507,9 +516,11 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable(getConnection()) { @Override public TableName[] call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); GetTableNamesRequest req = RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables); - return ProtobufUtil.getTableNameArray(master.getTableNames(null, req) + return ProtobufUtil.getTableNameArray(master.getTableNames(controller, req) .getTableNamesList()); } }); @@ -531,21 +542,24 @@ public class HBaseAdmin implements Admin { @Override public HTableDescriptor getTableDescriptor(final TableName tableName) throws TableNotFoundException, IOException { - return getTableDescriptor(tableName, getConnection(), rpcCallerFactory, operationTimeout); + return getTableDescriptor(tableName, getConnection(), rpcCallerFactory, rpcControllerFactory, + operationTimeout); } - static HTableDescriptor getTableDescriptor(final TableName tableName, - HConnection connection, RpcRetryingCallerFactory rpcCallerFactory, + static HTableDescriptor getTableDescriptor(final TableName tableName, HConnection connection, + RpcRetryingCallerFactory rpcCallerFactory, final RpcControllerFactory rpcControllerFactory, int operationTimeout) throws TableNotFoundException, IOException { if (tableName == null) return null; HTableDescriptor htd = executeCallable(new MasterCallable(connection) { @Override public HTableDescriptor call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); GetTableDescriptorsResponse htds; GetTableDescriptorsRequest req = RequestConverter.buildGetTableDescriptorsRequest(tableName); - htds = master.getTableDescriptors(null, req); + htds = master.getTableDescriptors(controller, req); if (!htds.getTableSchemaList().isEmpty()) { return HTableDescriptor.convert(htds.getTableSchemaList().get(0)); @@ -735,14 +749,17 @@ public class HBaseAdmin implements Admin { } CreateTableResponse response = executeCallable( - new MasterCallable(getConnection()) { - @Override - public CreateTableResponse call(int callTimeout) throws ServiceException { - CreateTableRequest request = RequestConverter.buildCreateTableRequest( - desc, splitKeys, ng.getNonceGroup(), ng.newNonce()); - return master.createTable(null, request); - } - }); + new MasterCallable(getConnection()) { + @Override + public CreateTableResponse call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + controller.setPriority(desc.getTableName()); + CreateTableRequest request = RequestConverter.buildCreateTableRequest( + desc, splitKeys, ng.getNonceGroup(), ng.newNonce()); + return master.createTable(controller, request); + } + }); return new CreateTableFuture(this, desc, splitKeys, response); } @@ -906,14 +923,17 @@ public class HBaseAdmin implements Admin { // TODO: This should be called Async but it will break binary compatibility private Future deleteTableAsyncV2(final TableName tableName) throws IOException { DeleteTableResponse response = executeCallable( - new MasterCallable(getConnection()) { - @Override - public DeleteTableResponse call(int callTimeout) throws ServiceException { - DeleteTableRequest req = - RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()); - return master.deleteTable(null,req); - } - }); + new MasterCallable(getConnection()) { + @Override + public DeleteTableResponse call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + controller.setPriority(tableName); + DeleteTableRequest req = + RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(),ng.newNonce()); + return master.deleteTable(controller,req); + } + }); return new DeleteTableFuture(this, tableName, response); } @@ -1154,15 +1174,19 @@ public class HBaseAdmin implements Admin { private Future enableTableAsyncV2(final TableName tableName) throws IOException { TableName.isLegalFullyQualifiedTableName(tableName.getName()); EnableTableResponse response = executeCallable( - new MasterCallable(getConnection()) { - @Override - public EnableTableResponse call(int callTimeout) throws ServiceException { - LOG.info("Started enable of " + tableName); - EnableTableRequest req = - RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()); - return master.enableTable(null,req); - } - }); + new MasterCallable(getConnection()) { + @Override + public EnableTableResponse call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + controller.setPriority(tableName); + + LOG.info("Started enable of " + tableName); + EnableTableRequest req = + RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(),ng.newNonce()); + return master.enableTable(controller,req); + } + }); return new EnableTableFuture(this, tableName, response); } @@ -1342,15 +1366,20 @@ public class HBaseAdmin implements Admin { private Future disableTableAsyncV2(final TableName tableName) throws IOException { TableName.isLegalFullyQualifiedTableName(tableName.getName()); DisableTableResponse response = executeCallable( - new MasterCallable(getConnection()) { - @Override - public DisableTableResponse call(int callTimeout) throws ServiceException { - LOG.info("Started disable of " + tableName); - DisableTableRequest req = - RequestConverter.buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()); - return master.disableTable(null, req); - } - }); + new MasterCallable(getConnection()) { + @Override + public DisableTableResponse call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + controller.setPriority(tableName); + + LOG.info("Started disable of " + tableName); + DisableTableRequest req = + RequestConverter.buildDisableTableRequest( + tableName, ng.getNonceGroup(), ng.newNonce()); + return master.disableTable(controller, req); + } + }); return new DisableTableFuture(this, tableName, response); } @@ -1557,9 +1586,13 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable>(getConnection()) { @Override public Pair call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + controller.setPriority(tableName); + GetSchemaAlterStatusRequest req = RequestConverter .buildGetSchemaAlterStatusRequest(tableName); - GetSchemaAlterStatusResponse ret = master.getSchemaAlterStatus(null, req); + GetSchemaAlterStatusResponse ret = master.getSchemaAlterStatus(controller, req); Pair pair = new Pair(Integer.valueOf(ret .getYetToUpdateRegions()), Integer.valueOf(ret.getTotalRegions())); return pair; @@ -1598,7 +1631,6 @@ public class HBaseAdmin implements Admin { addColumn(TableName.valueOf(tableName), column); } - /** * Add a column to an existing table. * Asynchronous operation. @@ -1626,9 +1658,12 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable(getConnection()) { @Override public Void call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + controller.setPriority(tableName); AddColumnRequest req = RequestConverter.buildAddColumnRequest( tableName, column, ng.getNonceGroup(), ng.newNonce()); - master.addColumn(null,req); + master.addColumn(controller,req); return null; } }); @@ -1674,9 +1709,12 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable(getConnection()) { @Override public Void call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + controller.setPriority(tableName); DeleteColumnRequest req = RequestConverter.buildDeleteColumnRequest( tableName, columnName, ng.getNonceGroup(), ng.newNonce()); - master.deleteColumn(null,req); + master.deleteColumn(controller, req); return null; } }); @@ -1708,8 +1746,6 @@ public class HBaseAdmin implements Admin { modifyColumn(TableName.valueOf(tableName), descriptor); } - - /** * Modify an existing column family on a table. * Asynchronous operation. @@ -1724,9 +1760,12 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable(getConnection()) { @Override public Void call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + controller.setPriority(tableName); ModifyColumnRequest req = RequestConverter.buildModifyColumnRequest( tableName, descriptor, ng.getNonceGroup(), ng.newNonce()); - master.modifyColumn(null,req); + master.modifyColumn(controller, req); return null; } }); @@ -1812,7 +1851,10 @@ public class HBaseAdmin implements Admin { CloseRegionRequest request = RequestConverter.buildCloseRegionRequest(sn, encodedRegionName, false); try { - CloseRegionResponse response = admin.closeRegion(null, request); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + + // TODO: this does not do retries, it should. Set priority and timeout in controller + CloseRegionResponse response = admin.closeRegion(controller, request); boolean isRegionClosed = response.getClosed(); if (false == isRegionClosed) { LOG.error("Not able to close the region " + encodedRegionName + "."); @@ -1834,8 +1876,10 @@ public class HBaseAdmin implements Admin { public void closeRegion(final ServerName sn, final HRegionInfo hri) throws IOException { AdminService.BlockingInterface admin = this.connection.getAdmin(sn); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + // Close the region without updating zk state. - ProtobufUtil.closeRegion(admin, sn, hri.getRegionName(), false); + ProtobufUtil.closeRegion(controller, admin, sn, hri.getRegionName(), false); } /** @@ -1844,7 +1888,8 @@ public class HBaseAdmin implements Admin { @Override public List getOnlineRegions(final ServerName sn) throws IOException { AdminService.BlockingInterface admin = this.connection.getAdmin(sn); - return ProtobufUtil.getOnlineRegions(admin); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + return ProtobufUtil.getOnlineRegions(controller, admin); } /** @@ -1903,11 +1948,12 @@ public class HBaseAdmin implements Admin { private void flush(final ServerName sn, final HRegionInfo hri) throws IOException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); AdminService.BlockingInterface admin = this.connection.getAdmin(sn); FlushRegionRequest request = RequestConverter.buildFlushRegionRequest(hri.getRegionName()); try { - admin.flushRegion(null, request); + admin.flushRegion(controller, request); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } @@ -2159,11 +2205,13 @@ public class HBaseAdmin implements Admin { private void compact(final ServerName sn, final HRegionInfo hri, final boolean major, final byte [] family) throws IOException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); AdminService.BlockingInterface admin = this.connection.getAdmin(sn); CompactRegionRequest request = RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, family); try { - admin.compactRegion(null, request); + // TODO: this does not do retries, it should. Set priority and timeout in controller + admin.compactRegion(controller, request); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } @@ -2189,10 +2237,17 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable(getConnection()) { @Override public Void call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + // Hard to know the table name, at least check if meta + if (isMetaRegion(encodedRegionName)) { + controller.setPriority(TableName.META_TABLE_NAME); + } + try { MoveRegionRequest request = RequestConverter.buildMoveRegionRequest(encodedRegionName, destServerName); - master.moveRegion(null, request); + master.moveRegion(controller, request); } catch (DeserializationException de) { LOG.error("Could not parse destination server name: " + de); throw new ServiceException(new DoNotRetryIOException(de)); @@ -2202,6 +2257,11 @@ public class HBaseAdmin implements Admin { }); } + private boolean isMetaRegion(final byte[] regionName) { + return Bytes.equals(regionName, HRegionInfo.FIRST_META_REGIONINFO.getRegionName()) + || Bytes.equals(regionName, HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes()); + } + /** * @param regionName * Region name to assign. @@ -2216,9 +2276,16 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable(getConnection()) { @Override public Void call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + // Hard to know the table name, at least check if meta + if (isMetaRegion(regionName)) { + controller.setPriority(TableName.META_TABLE_NAME); + } + AssignRegionRequest request = RequestConverter.buildAssignRegionRequest(toBeAssigned); - master.assignRegion(null,request); + master.assignRegion(controller,request); return null; } }); @@ -2245,9 +2312,15 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable(getConnection()) { @Override public Void call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + // Hard to know the table name, at least check if meta + if (isMetaRegion(regionName)) { + controller.setPriority(TableName.META_TABLE_NAME); + } UnassignRegionRequest request = RequestConverter.buildUnassignRegionRequest(toBeUnassigned, force); - master.unassignRegion(null, request); + master.unassignRegion(controller, request); return null; } }); @@ -2271,7 +2344,13 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable(getConnection()) { @Override public Void call(int callTimeout) throws ServiceException { - master.offlineRegion(null,RequestConverter.buildOfflineRegionRequest(regionName)); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + // Hard to know the table name, at least check if meta + if (isMetaRegion(regionName)) { + controller.setPriority(TableName.META_TABLE_NAME); + } + master.offlineRegion(controller, RequestConverter.buildOfflineRegionRequest(regionName)); return null; } }); @@ -2289,9 +2368,12 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable(getConnection()) { @Override public Boolean call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + SetBalancerRunningRequest req = RequestConverter.buildSetBalancerRunningRequest(on, synchronous); - return master.setBalancerRunning(null, req).getPrevBalanceValue(); + return master.setBalancerRunning(controller, req).getPrevBalanceValue(); } }); } @@ -2307,7 +2389,11 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable(getConnection()) { @Override public Boolean call(int callTimeout) throws ServiceException { - return master.balance(null, RequestConverter.buildBalanceRequest(false)).getBalancerRan(); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + + return master.balance(controller, + RequestConverter.buildBalanceRequest(false)).getBalancerRan(); } }); } @@ -2317,7 +2403,11 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable(getConnection()) { @Override public Boolean call(int callTimeout) throws ServiceException { - return master.balance(null, RequestConverter.buildBalanceRequest(force)).getBalancerRan(); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + + return master.balance(controller, + RequestConverter.buildBalanceRequest(force)).getBalancerRan(); } }); } @@ -2333,8 +2423,11 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable(getConnection()) { @Override public Boolean call(int callTimeout) throws ServiceException { - return master.isBalancerEnabled(null, RequestConverter.buildIsBalancerEnabledRequest()) - .getEnabled(); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + + return master.isBalancerEnabled(controller, + RequestConverter.buildIsBalancerEnabledRequest()).getEnabled(); } }); } @@ -2349,7 +2442,10 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable(getConnection()) { @Override public Boolean call(int callTimeout) throws ServiceException { - return master.normalize(null, + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + + return master.normalize(controller, RequestConverter.buildNormalizeRequest()).getNormalizerRan(); } }); @@ -2364,7 +2460,10 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable(getConnection()) { @Override public Boolean call(int callTimeout) throws ServiceException { - return master.isNormalizerEnabled(null, + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + + return master.isNormalizerEnabled(controller, RequestConverter.buildIsNormalizerEnabledRequest()).getEnabled(); } }); @@ -2379,9 +2478,12 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable(getConnection()) { @Override public Boolean call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + SetNormalizerRunningRequest req = RequestConverter.buildSetNormalizerRunningRequest(on); - return master.setNormalizerRunning(null, req).getPrevNormalizerValue(); + return master.setNormalizerRunning(controller, req).getPrevNormalizerValue(); } }); } @@ -2398,7 +2500,10 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable(getConnection()) { @Override public Boolean call(int callTimeout) throws ServiceException { - return master.enableCatalogJanitor(null, + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + + return master.enableCatalogJanitor(controller, RequestConverter.buildEnableCatalogJanitorRequest(enable)).getPrevValue(); } }); @@ -2414,7 +2519,10 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable(getConnection()) { @Override public Integer call(int callTimeout) throws ServiceException { - return master.runCatalogScan(null, + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + + return master.runCatalogScan(controller, RequestConverter.buildCatalogScanRequest()).getScanResult(); } }); @@ -2429,7 +2537,10 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable(getConnection()) { @Override public Boolean call(int callTimeout) throws ServiceException { - return master.isCatalogJanitorEnabled(null, + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + + return master.isCatalogJanitorEnabled(controller, RequestConverter.buildIsCatalogJanitorEnabledRequest()).getValue(); } }); @@ -2474,11 +2585,14 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable(getConnection()) { @Override public Void call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + try { DispatchMergingRegionsRequest request = RequestConverter .buildDispatchMergingRegionsRequest(encodedNameOfRegionA, encodedNameOfRegionB, forcible); - master.dispatchMergingRegions(null, request); + master.dispatchMergingRegions(controller, request); } catch (DeserializationException de) { LOG.error("Could not parse destination server name: " + de); } @@ -2610,9 +2724,12 @@ public class HBaseAdmin implements Admin { Bytes.compareTo(hri.getStartKey(), splitPoint) == 0) { throw new IOException("should not give a splitkey which equals to startkey!"); } - // TODO: This is not executed via retries + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setPriority(hri.getTable()); + + // TODO: this does not do retries, it should. Set priority and timeout in controller AdminService.BlockingInterface admin = this.connection.getAdmin(sn); - ProtobufUtil.split(admin, hri, splitPoint); + ProtobufUtil.split(controller, admin, hri, splitPoint); } /** @@ -2635,9 +2752,12 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable(getConnection()) { @Override public Void call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + controller.setPriority(tableName); ModifyTableRequest request = RequestConverter.buildModifyTableRequest( tableName, htd, ng.getNonceGroup(), ng.newNonce()); - master.modifyTable(null, request); + master.modifyTable(controller, request); return null; } }); @@ -2750,7 +2870,10 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable(getConnection()) { @Override public Void call(int callTimeout) throws ServiceException { - master.shutdown(null,ShutdownRequest.newBuilder().build()); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + controller.setPriority(HConstants.HIGH_QOS); + master.shutdown(controller, ShutdownRequest.newBuilder().build()); return null; } }); @@ -2767,7 +2890,10 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable(getConnection()) { @Override public Void call(int callTimeout) throws ServiceException { - master.stopMaster(null, StopMasterRequest.newBuilder().build()); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + controller.setPriority(HConstants.HIGH_QOS); + master.stopMaster(controller, StopMasterRequest.newBuilder().build()); return null; } }); @@ -2788,8 +2914,12 @@ public class HBaseAdmin implements Admin { this.connection.getAdmin(ServerName.valueOf(hostname, port, 0)); StopServerRequest request = RequestConverter.buildStopServerRequest( "Called by admin client " + this.connection.toString()); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + + controller.setPriority(HConstants.HIGH_QOS); try { - admin.stopServer(null, request); + // TODO: this does not do retries, it should. Set priority and timeout in controller + admin.stopServer(controller, request); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } @@ -2805,8 +2935,10 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable(getConnection()) { @Override public ClusterStatus call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); GetClusterStatusRequest req = RequestConverter.buildGetClusterStatusRequest(); - return ClusterStatus.convert(master.getClusterStatus(null, req).getClusterStatus()); + return ClusterStatus.convert(master.getClusterStatus(controller, req).getClusterStatus()); } }); } @@ -2829,7 +2961,10 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable(getConnection()) { @Override public Void call(int callTimeout) throws Exception { - master.createNamespace(null, + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + // TODO: set priority based on NS? + master.createNamespace(controller, CreateNamespaceRequest.newBuilder() .setNamespaceDescriptor(ProtobufUtil .toProtoNamespaceDescriptor(descriptor)).build() @@ -2849,7 +2984,9 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable(getConnection()) { @Override public Void call(int callTimeout) throws Exception { - master.modifyNamespace(null, ModifyNamespaceRequest.newBuilder(). + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + master.modifyNamespace(controller, ModifyNamespaceRequest.newBuilder(). setNamespaceDescriptor(ProtobufUtil.toProtoNamespaceDescriptor(descriptor)).build()); return null; } @@ -2866,7 +3003,9 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable(getConnection()) { @Override public Void call(int callTimeout) throws Exception { - master.deleteNamespace(null, DeleteNamespaceRequest.newBuilder(). + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + master.deleteNamespace(controller, DeleteNamespaceRequest.newBuilder(). setNamespaceName(name).build()); return null; } @@ -2885,8 +3024,10 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable(getConnection()) { @Override public NamespaceDescriptor call(int callTimeout) throws Exception { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); return ProtobufUtil.toNamespaceDescriptor( - master.getNamespaceDescriptor(null, GetNamespaceDescriptorRequest.newBuilder(). + master.getNamespaceDescriptor(controller, GetNamespaceDescriptorRequest.newBuilder(). setNamespaceName(name).build()).getNamespaceDescriptor()); } }); @@ -2903,9 +3044,12 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable(getConnection()) { @Override public NamespaceDescriptor[] call(int callTimeout) throws Exception { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); List list = - master.listNamespaceDescriptors(null, ListNamespaceDescriptorsRequest.newBuilder(). - build()).getNamespaceDescriptorList(); + master.listNamespaceDescriptors(controller, + ListNamespaceDescriptorsRequest.newBuilder().build()) + .getNamespaceDescriptorList(); NamespaceDescriptor[] res = new NamespaceDescriptor[list.size()]; for(int i = 0; i < list.size(); i++) { res[i] = ProtobufUtil.toNamespaceDescriptor(list.get(i)); @@ -2926,8 +3070,10 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable(getConnection()) { @Override public ProcedureInfo[] call(int callTimeout) throws Exception { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); List procList = master.listProcedures( - null, ListProceduresRequest.newBuilder().build()).getProcedureList(); + controller, ListProceduresRequest.newBuilder().build()).getProcedureList(); ProcedureInfo[] procInfoList = new ProcedureInfo[procList.size()]; for (int i = 0; i < procList.size(); i++) { procInfoList[i] = ProcedureInfo.convert(procList.get(i)); @@ -2949,9 +3095,12 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable(getConnection()) { @Override public HTableDescriptor[] call(int callTimeout) throws Exception { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); List list = - master.listTableDescriptorsByNamespace(null, ListTableDescriptorsByNamespaceRequest. - newBuilder().setNamespaceName(name).build()).getTableSchemaList(); + master.listTableDescriptorsByNamespace(controller, + ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name) + .build()).getTableSchemaList(); HTableDescriptor[] res = new HTableDescriptor[list.size()]; for(int i=0; i < list.size(); i++) { @@ -2974,8 +3123,10 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable(getConnection()) { @Override public TableName[] call(int callTimeout) throws Exception { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); List tableNames = - master.listTableNamesByNamespace(null, ListTableNamesByNamespaceRequest. + master.listTableNamesByNamespace(controller, ListTableNamesByNamespaceRequest. newBuilder().setNamespaceName(name).build()) .getTableNameList(); TableName[] result = new TableName[tableNames.size()]; @@ -3073,9 +3224,11 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable(getConnection()) { @Override public HTableDescriptor[] call(int callTimeout) throws Exception { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); GetTableDescriptorsRequest req = RequestConverter.buildGetTableDescriptorsRequest(tableNames); - return ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(null, req)); + return ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(controller, req)); } }); } @@ -3121,8 +3274,11 @@ public class HBaseAdmin implements Admin { FailedLogCloseException { AdminService.BlockingInterface admin = this.connection.getAdmin(sn); RollWALWriterRequest request = RequestConverter.buildRollWALWriterRequest(); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + try { - return admin.rollWALWriter(null, request); + // TODO: this does not do retries, it should. Set priority and timeout in controller + return admin.rollWALWriter(controller, request); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } @@ -3267,7 +3423,9 @@ public class HBaseAdmin implements Admin { AdminService.BlockingInterface admin = this.connection.getAdmin(sn); GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest( regionServerPair.getFirst().getRegionName(), true); - GetRegionInfoResponse response = admin.getRegionInfo(null, request); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + // TODO: this does not do retries, it should. Set priority and timeout in controller + GetRegionInfoResponse response = admin.getRegionInfo(controller, request); return response.getCompactionState(); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); @@ -3475,7 +3633,9 @@ public class HBaseAdmin implements Admin { done = executeCallable(new MasterCallable(getConnection()) { @Override public IsSnapshotDoneResponse call(int callTimeout) throws ServiceException { - return master.isSnapshotDone(null, request); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + return master.isSnapshotDone(controller, request); } }); } @@ -3505,7 +3665,9 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable(getConnection()) { @Override public SnapshotResponse call(int callTimeout) throws ServiceException { - return master.snapshot(null, request); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + return master.snapshot(controller, request); } }); } @@ -3537,7 +3699,9 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable(getConnection()) { @Override public IsSnapshotDoneResponse call(int callTimeout) throws ServiceException { - return master.isSnapshotDone(null, + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + return master.isSnapshotDone(controller, IsSnapshotDoneRequest.newBuilder().setSnapshot(snapshot).build()); } }).getDone(); @@ -3792,7 +3956,9 @@ public class HBaseAdmin implements Admin { getConnection()) { @Override public ExecProcedureResponse call(int callTimeout) throws ServiceException { - return master.execProcedureWithRet(null, request); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + return master.execProcedureWithRet(controller, request); } }); @@ -3826,7 +3992,9 @@ public class HBaseAdmin implements Admin { getConnection()) { @Override public ExecProcedureResponse call(int callTimeout) throws ServiceException { - return master.execProcedure(null, request); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + return master.execProcedure(controller, request); } }); @@ -3890,7 +4058,9 @@ public class HBaseAdmin implements Admin { new MasterCallable(getConnection()) { @Override public IsProcedureDoneResponse call(int callTimeout) throws ServiceException { - return master.isProcedureDone(null, IsProcedureDoneRequest + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + return master.isProcedureDone(controller, IsProcedureDoneRequest .newBuilder().setProcedure(desc).build()); } }).getDone(); @@ -3936,7 +4106,9 @@ public class HBaseAdmin implements Admin { getConnection()) { @Override public IsRestoreSnapshotDoneResponse call(int callTimeout) throws ServiceException { - return master.isRestoreSnapshotDone(null, request); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + return master.isRestoreSnapshotDone(controller, request); } }); } @@ -3966,7 +4138,9 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable(getConnection()) { @Override public RestoreSnapshotResponse call(int callTimeout) throws ServiceException { - return master.restoreSnapshot(null, request); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + return master.restoreSnapshot(controller, request); } }); } @@ -3981,8 +4155,10 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable>(getConnection()) { @Override public List call(int callTimeout) throws ServiceException { - return master.getCompletedSnapshots(null, GetCompletedSnapshotsRequest.newBuilder().build()) - .getSnapshotsList(); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + return master.getCompletedSnapshots(controller, + GetCompletedSnapshotsRequest.newBuilder().build()).getSnapshotsList(); } }); } @@ -4080,7 +4256,9 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable(getConnection()) { @Override public Void call(int callTimeout) throws ServiceException { - master.deleteSnapshot(null, + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + master.deleteSnapshot(controller, DeleteSnapshotRequest.newBuilder(). setSnapshot(SnapshotDescription.newBuilder().setName(snapshotName).build()).build() ); @@ -4122,8 +4300,10 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable(getConnection()) { @Override public Void call(int callTimeout) throws ServiceException { - this.master.deleteSnapshot(null, DeleteSnapshotRequest.newBuilder().setSnapshot(snapshot) - .build()); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + this.master.deleteSnapshot(controller, DeleteSnapshotRequest.newBuilder() + .setSnapshot(snapshot).build()); return null; } }); @@ -4173,7 +4353,9 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable(getConnection()) { @Override public Void call(int callTimeout) throws ServiceException { - this.master.setQuota(null, QuotaSettings.buildSetQuotaRequestProto(quota)); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + this.master.setQuota(controller, QuotaSettings.buildSetQuotaRequestProto(quota)); return null; } }); @@ -4308,10 +4490,12 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable(getConnection()) { @Override public Long call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); MajorCompactionTimestampRequest req = MajorCompactionTimestampRequest.newBuilder() .setTableName(ProtobufUtil.toProtoTableName(tableName)).build(); - return master.getLastMajorCompactionTimestamp(null, req).getCompactionTimestamp(); + return master.getLastMajorCompactionTimestamp(controller, req).getCompactionTimestamp(); } }); } @@ -4321,13 +4505,16 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable(getConnection()) { @Override public Long call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); MajorCompactionTimestampForRegionRequest req = MajorCompactionTimestampForRegionRequest .newBuilder() .setRegion( RequestConverter .buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName)).build(); - return master.getLastMajorCompactionTimestampForRegion(null, req).getCompactionTimestamp(); + return master.getLastMajorCompactionTimestampForRegion(controller, req) + .getCompactionTimestamp(); } }); } @@ -4386,7 +4573,9 @@ public class HBaseAdmin implements Admin { admin.getConnection()) { @Override public AbortProcedureResponse call(int callTimeout) throws ServiceException { - return master.abortProcedure(null, request); + PayloadCarryingRpcController controller = admin.getRpcControllerFactory().newController(); + controller.setCallTimeout(callTimeout); + return master.abortProcedure(controller, request); } }); } @@ -4598,9 +4787,11 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable>(getConnection()) { @Override public List call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); SecurityCapabilitiesRequest req = SecurityCapabilitiesRequest.newBuilder().build(); return ProtobufUtil.toSecurityCapabilityList( - master.getSecurityCapabilities(null, req).getCapabilitiesList()); + master.getSecurityCapabilities(controller, req).getCapabilitiesList()); } }); } catch (IOException e) { @@ -4641,4 +4832,7 @@ public class HBaseAdmin implements Admin { }); } + private RpcControllerFactory getRpcControllerFactory() { + return rpcControllerFactory; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/5202d3c2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index ec28c5a..fe006ba 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -117,7 +117,7 @@ public class HTable implements HTableInterface, RegionLocator { protected ClusterConnection connection; private final TableName tableName; private volatile Configuration configuration; - private TableConfiguration tableConfiguration; + private ConnectionConfiguration connConfiguration; protected BufferedMutatorImpl mutator; private boolean autoFlush = true; private boolean closed = false; @@ -298,7 +298,7 @@ public class HTable implements HTableInterface, RegionLocator { */ @InterfaceAudience.Private public HTable(TableName tableName, final ClusterConnection connection, - final TableConfiguration tableConfig, + final ConnectionConfiguration tableConfig, final RpcRetryingCallerFactory rpcCallerFactory, final RpcControllerFactory rpcControllerFactory, final ExecutorService pool) throws IOException { @@ -309,7 +309,7 @@ public class HTable implements HTableInterface, RegionLocator { this.cleanupConnectionOnClose = false; this.connection = connection; this.configuration = connection.getConfiguration(); - this.tableConfiguration = tableConfig; + this.connConfiguration = tableConfig; this.pool = pool; if (pool == null) { this.pool = getDefaultExecutor(this.configuration); @@ -332,7 +332,7 @@ public class HTable implements HTableInterface, RegionLocator { protected HTable(ClusterConnection conn, BufferedMutatorParams params) throws IOException { connection = conn; tableName = params.getTableName(); - tableConfiguration = new TableConfiguration(connection.getConfiguration()); + connConfiguration = new ConnectionConfiguration(connection.getConfiguration()); cleanupPoolOnClose = false; cleanupConnectionOnClose = false; // used from tests, don't trust the connection is real @@ -350,14 +350,14 @@ public class HTable implements HTableInterface, RegionLocator { * setup this HTable's parameter based on the passed configuration */ private void finishSetup() throws IOException { - if (tableConfiguration == null) { - tableConfiguration = new TableConfiguration(configuration); + if (connConfiguration == null) { + connConfiguration = new ConnectionConfiguration(configuration); } this.operationTimeout = tableName.isSystemTable() ? - tableConfiguration.getMetaOperationTimeout() : tableConfiguration.getOperationTimeout(); - this.scannerCaching = tableConfiguration.getScannerCaching(); - this.scannerMaxResultSize = tableConfiguration.getScannerMaxResultSize(); + connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout(); + this.scannerCaching = connConfiguration.getScannerCaching(); + this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize(); if (this.rpcCallerFactory == null) { this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration); } @@ -570,23 +570,14 @@ public class HTable implements HTableInterface, RegionLocator { */ @Override public HTableDescriptor getTableDescriptor() throws IOException { - HTableDescriptor htd = HBaseAdmin.getTableDescriptor(tableName, connection, - rpcCallerFactory, operationTimeout); + HTableDescriptor htd = HBaseAdmin.getTableDescriptor(tableName, connection, rpcCallerFactory, + rpcControllerFactory, operationTimeout); if (htd != null) { return new UnmodifyableHTableDescriptor(htd); } return null; } - private V executeMasterCallable(MasterCallable callable) throws IOException { - RpcRetryingCaller caller = rpcCallerFactory.newCaller(); - try { - return caller.callWithRetries(callable, operationTimeout); - } finally { - callable.close(); - } - } - /** * To be removed in 2.0.0. * @deprecated Since 1.1.0. Use {@link RegionLocator#getStartEndKeys()} instead @@ -786,22 +777,22 @@ public class HTable implements HTableInterface, RegionLocator { if (scan.isSmall()) { return new ClientSmallReversedScanner(getConfiguration(), scan, getName(), this.connection, this.rpcCallerFactory, this.rpcControllerFactory, - pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan()); + pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan()); } else { return new ReversedClientScanner(getConfiguration(), scan, getName(), this.connection, this.rpcCallerFactory, this.rpcControllerFactory, - pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan()); + pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan()); } } if (scan.isSmall()) { return new ClientSmallScanner(getConfiguration(), scan, getName(), this.connection, this.rpcCallerFactory, this.rpcControllerFactory, - pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan()); + pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan()); } else { return new ClientScanner(getConfiguration(), scan, getName(), this.connection, this.rpcCallerFactory, this.rpcControllerFactory, - pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan()); + pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan()); } } @@ -872,10 +863,10 @@ public class HTable implements HTableInterface, RegionLocator { // Call that takes into account the replica RpcRetryingCallerWithReadReplicas callable = new RpcRetryingCallerWithReadReplicas( - rpcControllerFactory, tableName, this.connection, get, pool, - tableConfiguration.getRetriesNumber(), - operationTimeout, - tableConfiguration.getPrimaryCallTimeoutMicroSecond()); + rpcControllerFactory, tableName, this.connection, get, pool, + connConfiguration.getRetriesNumber(), + operationTimeout, + connConfiguration.getPrimaryCallTimeoutMicroSecond()); return callable.call(); } @@ -1517,7 +1508,7 @@ public class HTable implements HTableInterface, RegionLocator { // validate for well-formedness public void validatePut(final Put put) throws IllegalArgumentException { - validatePut(put, tableConfiguration.getMaxKeyValueSize()); + validatePut(put, connConfiguration.getMaxKeyValueSize()); } // validate for well-formedness @@ -1579,7 +1570,7 @@ public class HTable implements HTableInterface, RegionLocator { @Override public long getWriteBufferSize() { if (mutator == null) { - return tableConfiguration.getWriteBufferSize(); + return connConfiguration.getWriteBufferSize(); } else { return mutator.getWriteBufferSize(); } @@ -1927,8 +1918,8 @@ public class HTable implements HTableInterface, RegionLocator { this.mutator = (BufferedMutatorImpl) connection.getBufferedMutator( new BufferedMutatorParams(tableName) .pool(pool) - .writeBufferSize(tableConfiguration.getWriteBufferSize()) - .maxKeyValueSize(tableConfiguration.getMaxKeyValueSize()) + .writeBufferSize(connConfiguration.getWriteBufferSize()) + .maxKeyValueSize(connConfiguration.getMaxKeyValueSize()) ); } return mutator; http://git-wip-us.apache.org/repos/asf/hbase/blob/5202d3c2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableConfiguration.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableConfiguration.java deleted file mode 100644 index 901e86d..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableConfiguration.java +++ /dev/null @@ -1,132 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable - * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" - * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License - * for the specific language governing permissions and limitations under the License. - */ - -package org.apache.hadoop.hbase.client; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.classification.InterfaceAudience; - -import com.google.common.annotations.VisibleForTesting; - -/** - * - * Configuration is a heavy weight registry that does a lot of string operations and regex matching. - * Method calls into Configuration account for high CPU usage and have huge performance impact. - * This class caches the value in the TableConfiguration object to improve performance. - * see HBASE-12128 - * - */ -@InterfaceAudience.Private -public class TableConfiguration { - - public static final String WRITE_BUFFER_SIZE_KEY = "hbase.client.write.buffer"; - public static final long WRITE_BUFFER_SIZE_DEFAULT = 2097152; - public static final String MAX_KEYVALUE_SIZE_KEY = "hbase.client.keyvalue.maxsize"; - public static final int MAX_KEYVALUE_SIZE_DEFAULT = -1; - - private final long writeBufferSize; - private final int metaOperationTimeout; - private final int operationTimeout; - private final int scannerCaching; - private final long scannerMaxResultSize; - private final int primaryCallTimeoutMicroSecond; - private final int replicaCallTimeoutMicroSecondScan; - private final int retries; - private final int maxKeyValueSize; - - /** - * Constructor - * @param conf Configuration object - */ - TableConfiguration(Configuration conf) { - this.writeBufferSize = conf.getLong(WRITE_BUFFER_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT); - - this.metaOperationTimeout = conf.getInt( - HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, - HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); - - this.operationTimeout = conf.getInt( - HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); - - this.scannerCaching = conf.getInt( - HConstants.HBASE_CLIENT_SCANNER_CACHING, HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING); - - this.scannerMaxResultSize = - conf.getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, - HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); - - this.primaryCallTimeoutMicroSecond = - conf.getInt("hbase.client.primaryCallTimeout.get", 10000); // 10ms - - this.replicaCallTimeoutMicroSecondScan = - conf.getInt("hbase.client.replicaCallTimeout.scan", 1000000); // 1000 ms - - this.retries = conf.getInt( - HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); - - this.maxKeyValueSize = conf.getInt(MAX_KEYVALUE_SIZE_KEY, MAX_KEYVALUE_SIZE_DEFAULT); - } - - /** - * Constructor - * This is for internal testing purpose (using the default value). - * In real usage, we should read the configuration from the Configuration object. - */ - @VisibleForTesting - protected TableConfiguration() { - this.writeBufferSize = WRITE_BUFFER_SIZE_DEFAULT; - this.metaOperationTimeout = HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT; - this.operationTimeout = HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT; - this.scannerCaching = HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING; - this.scannerMaxResultSize = HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE; - this.primaryCallTimeoutMicroSecond = 10000; - this.replicaCallTimeoutMicroSecondScan = 1000000; - this.retries = HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER; - this.maxKeyValueSize = MAX_KEYVALUE_SIZE_DEFAULT; - } - - public long getWriteBufferSize() { - return writeBufferSize; - } - - public int getMetaOperationTimeout() { - return metaOperationTimeout; - } - - public int getOperationTimeout() { - return operationTimeout; - } - - public int getScannerCaching() { - return scannerCaching; - } - - public int getPrimaryCallTimeoutMicroSecond() { - return primaryCallTimeoutMicroSecond; - } - - public int getReplicaCallTimeoutMicroSecondScan() { - return replicaCallTimeoutMicroSecondScan; - } - - public int getRetriesNumber() { - return retries; - } - - public int getMaxKeyValueSize() { - return maxKeyValueSize; - } - - public long getScannerMaxResultSize() { - return scannerMaxResultSize; - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/5202d3c2/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java index a53fb70..ec6332a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java @@ -318,7 +318,7 @@ public abstract class AbstractRpcClient implements RpcClient { public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController controller, Message param, Message returnType) throws ServiceException { PayloadCarryingRpcController pcrc; - if (controller != null) { + if (controller != null && controller instanceof PayloadCarryingRpcController) { pcrc = (PayloadCarryingRpcController) controller; if (!pcrc.hasCallTimeout()) { pcrc.setCallTimeout(channelOperationTimeout); http://git-wip-us.apache.org/repos/asf/hbase/blob/5202d3c2/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CoprocessorRpcChannel.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CoprocessorRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CoprocessorRpcChannel.java index 23455d5..231a7cc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CoprocessorRpcChannel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CoprocessorRpcChannel.java @@ -53,7 +53,7 @@ public abstract class CoprocessorRpcChannel implements RpcChannel, BlockingRpcCh RpcCallback callback) { Message response = null; try { - response = callExecService(method, request, responsePrototype); + response = callExecService(controller, method, request, responsePrototype); } catch (IOException ioe) { LOG.warn("Call failed on IOException", ioe); ResponseConverter.setControllerException(controller, ioe); @@ -70,12 +70,13 @@ public abstract class CoprocessorRpcChannel implements RpcChannel, BlockingRpcCh Message request, Message responsePrototype) throws ServiceException { try { - return callExecService(method, request, responsePrototype); + return callExecService(controller, method, request, responsePrototype); } catch (IOException ioe) { throw new ServiceException("Error calling method "+method.getFullName(), ioe); } } - protected abstract Message callExecService(Descriptors.MethodDescriptor method, - Message request, Message responsePrototype) throws IOException; + protected abstract Message callExecService(RpcController controller, + Descriptors.MethodDescriptor method, Message request, Message responsePrototype) + throws IOException; }