From commits-return-21448-archive-asf-public=cust-asf.ponee.io@phoenix.apache.org Tue May 22 22:39:48 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 4B4F4180638 for ; Tue, 22 May 2018 22:39:46 +0200 (CEST) Received: (qmail 37810 invoked by uid 500); 22 May 2018 20:39:45 -0000 Mailing-List: contact commits-help@phoenix.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@phoenix.apache.org Delivered-To: mailing list commits@phoenix.apache.org Received: (qmail 37791 invoked by uid 99); 22 May 2018 20:39:45 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 22 May 2018 20:39:45 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id ECBAEE107B; Tue, 22 May 2018 20:39:44 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jamestaylor@apache.org To: commits@phoenix.apache.org Date: Tue, 22 May 2018 20:39:44 -0000 Message-Id: <3658d5eb74c64cfcad4f5ceab8640d62@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [01/22] phoenix git commit: PHOENIX-4685 Properly handle connection caching for Phoenix inside RegionServers(Rajeshbabu) Repository: phoenix Updated Branches: refs/heads/omid2 d0f98a020 -> 2015345a0 PHOENIX-4685 Properly handle connection caching for Phoenix inside RegionServers(Rajeshbabu) Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/4082c73e Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/4082c73e Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/4082c73e Branch: refs/heads/omid2 Commit: 4082c73ee23d901642d8c5bc45ececfcf5e50ede Parents: d0f98a0 Author: Rajeshbabu Chintaguntla Authored: Tue May 8 12:06:49 2018 +0530 Committer: Rajeshbabu Chintaguntla Committed: Tue May 8 12:06:49 2018 +0530 ---------------------------------------------------------------------- .../DelegateRegionCoprocessorEnvironment.java | 7 +- .../UngroupedAggregateRegionObserver.java | 14 +- .../org/apache/phoenix/hbase/index/Indexer.java | 19 +-- .../hbase/index/write/IndexWriterUtils.java | 27 +--- .../index/PhoenixTransactionalIndexer.java | 18 +-- .../org/apache/phoenix/util/ServerUtil.java | 141 ++++++++++++++++--- 6 files changed, 142 insertions(+), 84 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/4082c73e/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionCoprocessorEnvironment.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionCoprocessorEnvironment.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionCoprocessorEnvironment.java index 284d53c..a791f4a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionCoprocessorEnvironment.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionCoprocessorEnvironment.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.phoenix.hbase.index.table.HTableFactory; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.util.ServerUtil; +import org.apache.phoenix.util.ServerUtil.ConnectionType; /** * Class to encapsulate {@link RegionCoprocessorEnvironment} for phoenix coprocessors. Often we @@ -44,10 +45,10 @@ public class DelegateRegionCoprocessorEnvironment implements RegionCoprocessorEn private RegionCoprocessorEnvironment delegate; private HTableFactory tableFactory; - public DelegateRegionCoprocessorEnvironment(Configuration config, RegionCoprocessorEnvironment delegate) { - this.config = config; + public DelegateRegionCoprocessorEnvironment(RegionCoprocessorEnvironment delegate, ConnectionType connectionType) { + this.config = ServerUtil.ConnectionFactory.getTypeSpecificConfiguration(connectionType, delegate.getConfiguration()); this.delegate = delegate; - this.tableFactory = ServerUtil.getDelegateHTableFactory(this, config); + this.tableFactory = ServerUtil.getDelegateHTableFactory(this, connectionType); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/4082c73e/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index 6bee65c..14213f4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -144,6 +144,7 @@ import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.ServerUtil; +import org.apache.phoenix.util.ServerUtil.ConnectionType; import org.apache.phoenix.util.StringUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -225,14 +226,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver upsertSelectConfig.setClass(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, InterRegionServerIndexRpcControllerFactory.class, RpcControllerFactory.class); - compactionConfig = PropertiesUtil.cloneConfig(e.getConfiguration()); - // lower the number of rpc retries, so we don't hang the compaction - compactionConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, - e.getConfiguration().getInt(QueryServices.METADATA_WRITE_RETRIES_NUMBER, - QueryServicesOptions.DEFAULT_METADATA_WRITE_RETRIES_NUMBER)); - compactionConfig.setInt(HConstants.HBASE_CLIENT_PAUSE, - e.getConfiguration().getInt(QueryServices.METADATA_WRITE_RETRY_PAUSE, - QueryServicesOptions.DEFAULT_METADATA_WRITE_RETRY_PAUSE)); + compactionConfig = ServerUtil.getCompactionConfig(e.getConfiguration()); // For retries of index write failures, use the same # of retries as the rebuilder indexWriteConfig = PropertiesUtil.cloneConfig(e.getConfiguration()); @@ -984,7 +978,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver InternalScanner internalScanner = scanner; try { long clientTimeStamp = EnvironmentEdgeManager.currentTimeMillis(); - DelegateRegionCoprocessorEnvironment compactionConfEnv = new DelegateRegionCoprocessorEnvironment(compactionConfig, c.getEnvironment()); + DelegateRegionCoprocessorEnvironment compactionConfEnv = + new DelegateRegionCoprocessorEnvironment(c.getEnvironment(), + ConnectionType.COMPACTION_CONNECTION); StatisticsCollector stats = StatisticsCollectorFactory.createStatisticsCollector( compactionConfEnv, table.getNameAsString(), clientTimeStamp, store.getFamily().getName()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/4082c73e/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java index 7325cd8..115182b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java @@ -94,6 +94,7 @@ import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.ServerUtil; +import org.apache.phoenix.util.ServerUtil.ConnectionType; import com.google.common.collect.Lists; import com.google.common.collect.Multimap; @@ -219,25 +220,11 @@ public class Indexer extends BaseRegionObserver { this.builder = new IndexBuildManager(env); // Clone the config since it is shared - Configuration clonedConfig = PropertiesUtil.cloneConfig(e.getConfiguration()); - /* - * Set the rpc controller factory so that the HTables used by IndexWriter would - * set the correct priorities on the remote RPC calls. - */ - clonedConfig.setClass(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, - InterRegionServerIndexRpcControllerFactory.class, RpcControllerFactory.class); - // lower the number of rpc retries. We inherit config from HConnectionManager#setServerSideHConnectionRetries, - // which by default uses a multiplier of 10. That is too many retries for our synchronous index writes - clonedConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, - env.getConfiguration().getInt(INDEX_WRITER_RPC_RETRIES_NUMBER, - DEFAULT_INDEX_WRITER_RPC_RETRIES_NUMBER)); - clonedConfig.setInt(HConstants.HBASE_CLIENT_PAUSE, env.getConfiguration() - .getInt(INDEX_WRITER_RPC_PAUSE, DEFAULT_INDEX_WRITER_RPC_PAUSE)); - DelegateRegionCoprocessorEnvironment indexWriterEnv = new DelegateRegionCoprocessorEnvironment(clonedConfig, env); + DelegateRegionCoprocessorEnvironment indexWriterEnv = new DelegateRegionCoprocessorEnvironment(env, ConnectionType.INDEX_WRITER_CONNECTION); // setup the actual index writer this.writer = new IndexWriter(indexWriterEnv, serverName + "-index-writer"); - this.rowLockWaitDuration = clonedConfig.getInt("hbase.rowlock.wait.duration", + this.rowLockWaitDuration = env.getConfiguration().getInt("hbase.rowlock.wait.duration", DEFAULT_ROWLOCK_WAIT_DURATION); this.lockManager = new LockManager(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/4082c73e/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java index 0d3004f..ef53b9f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java @@ -26,6 +26,7 @@ import org.apache.phoenix.hbase.index.table.HTableFactory; import org.apache.phoenix.hbase.index.util.IndexManagementUtil; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.ServerUtil; +import org.apache.phoenix.util.ServerUtil.ConnectionType; public class IndexWriterUtils { @@ -50,9 +51,9 @@ public class IndexWriterUtils { * threads. Currently, HBase doesn't support a custom thread-pool to back the HTable via the * coprocesor hooks, so we can't modify this behavior. */ - private static final String INDEX_WRITER_PER_TABLE_THREADS_CONF_KEY = + public static final String INDEX_WRITER_PER_TABLE_THREADS_CONF_KEY = "index.writer.threads.pertable.max"; - private static final int DEFAULT_NUM_PER_TABLE_THREADS = Integer.MAX_VALUE; + public static final int DEFAULT_NUM_PER_TABLE_THREADS = Integer.MAX_VALUE; /** Configuration key that HBase uses to set the max number of threads for an HTable */ public static final String HTABLE_THREAD_KEY = "hbase.htable.threads.max"; @@ -79,19 +80,7 @@ public class IndexWriterUtils { } public static HTableFactory getDefaultDelegateHTableFactory(CoprocessorEnvironment env) { - // create a simple delegate factory, setup the way we need - Configuration conf = PropertiesUtil.cloneConfig(env.getConfiguration()); - setHTableThreads(conf); - return ServerUtil.getDelegateHTableFactory(env, conf); - } - - private static void setHTableThreads(Configuration conf) { - // set the number of threads allowed per table. - int htableThreads = - conf.getInt(IndexWriterUtils.INDEX_WRITER_PER_TABLE_THREADS_CONF_KEY, - IndexWriterUtils.DEFAULT_NUM_PER_TABLE_THREADS); - LOG.trace("Creating HTableFactory with " + htableThreads + " threads for each HTable."); - IndexManagementUtil.setIfNotSet(conf, HTABLE_THREAD_KEY, htableThreads); + return ServerUtil.getDelegateHTableFactory(env, ConnectionType.INDEX_WRITER_CONNECTION_WITH_CUSTOM_THREADS); } /** @@ -99,12 +88,8 @@ public class IndexWriterUtils { * instead to avoid tying up the handler */ public static HTableFactory getNoRetriesHTableFactory(CoprocessorEnvironment env) { - Configuration conf = PropertiesUtil.cloneConfig(env.getConfiguration()); - setHTableThreads(conf); - // note in HBase 2+, numTries = numRetries + 1 - // in prior versions, numTries = numRetries - conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); - return ServerUtil.getDelegateHTableFactory(env, conf); + return ServerUtil.getDelegateHTableFactory(env, + ConnectionType.INDEX_WRITER_CONNECTION_WITH_CUSTOM_THREADS_NO_RETRIES); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/4082c73e/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java index bdfcaff..02296c9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java @@ -58,6 +58,7 @@ import org.apache.phoenix.trace.util.NullSpan; import org.apache.phoenix.transaction.PhoenixTransactionContext; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.ServerUtil; +import org.apache.phoenix.util.ServerUtil.ConnectionType; import org.apache.phoenix.util.TransactionUtil; /** @@ -94,22 +95,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { Configuration conf = e.getConfiguration(); String serverName = env.getRegionServerServices().getServerName().getServerName(); codec = new PhoenixIndexCodec(conf, env.getRegion().getRegionInfo().getStartKey(), env.getRegion().getRegionInfo().getEndKey(), env.getRegionInfo().getTable().getName()); - // Clone the config since it is shared - Configuration clonedConfig = PropertiesUtil.cloneConfig(e.getConfiguration()); - /* - * Set the rpc controller factory so that the HTables used by IndexWriter would - * set the correct priorities on the remote RPC calls. - */ - clonedConfig.setClass(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, - InterRegionServerIndexRpcControllerFactory.class, RpcControllerFactory.class); - // lower the number of rpc retries. We inherit config from HConnectionManager#setServerSideHConnectionRetries, - // which by default uses a multiplier of 10. That is too many retries for our synchronous index writes - clonedConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, - env.getConfiguration().getInt(INDEX_WRITER_RPC_RETRIES_NUMBER, - DEFAULT_INDEX_WRITER_RPC_RETRIES_NUMBER)); - clonedConfig.setInt(HConstants.HBASE_CLIENT_PAUSE, env.getConfiguration() - .getInt(INDEX_WRITER_RPC_PAUSE, DEFAULT_INDEX_WRITER_RPC_PAUSE)); - DelegateRegionCoprocessorEnvironment indexWriterEnv = new DelegateRegionCoprocessorEnvironment(clonedConfig, env); + DelegateRegionCoprocessorEnvironment indexWriterEnv = new DelegateRegionCoprocessorEnvironment(env, ConnectionType.INDEX_WRITER_CONNECTION); // setup the actual index writer // For transactional tables, we keep the index active upon a write failure // since we have the all versus none behavior for transactions. Also, we http://git-wip-us.apache.org/repos/asf/phoenix/blob/4082c73e/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java index 4b3cc43..2dab076 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java @@ -17,11 +17,17 @@ */ package org.apache.phoenix.util; +import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.DEFAULT_INDEX_WRITER_RPC_PAUSE; +import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.DEFAULT_INDEX_WRITER_RPC_RETRIES_NUMBER; +import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.INDEX_WRITER_RPC_PAUSE; +import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.INDEX_WRITER_RPC_RETRIES_NUMBER; + import java.io.IOException; import java.sql.SQLException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutorService; +import java.util.function.Function; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -35,12 +41,15 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.CoprocessorHConnection; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.HTablePool; import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.ipc.controller.InterRegionServerIndexRpcControllerFactory; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.RegionServerServices; @@ -52,8 +61,13 @@ import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.hbase.index.table.CoprocessorHTableFactory; import org.apache.phoenix.hbase.index.table.HTableFactory; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.hbase.index.util.IndexManagementUtil; import org.apache.phoenix.hbase.index.util.VersionUtil; +import org.apache.phoenix.hbase.index.write.IndexWriterUtils; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.StaleRegionBoundaryCacheException; +import org.jboss.netty.util.internal.ConcurrentHashMap; @SuppressWarnings("deprecation") @@ -269,12 +283,12 @@ public class ServerUtil { endKey) < 0)); } - public static HTableFactory getDelegateHTableFactory(CoprocessorEnvironment env, Configuration conf) { + public static HTableFactory getDelegateHTableFactory(CoprocessorEnvironment env, ConnectionType connectionType) { if (env instanceof RegionCoprocessorEnvironment) { RegionCoprocessorEnvironment e = (RegionCoprocessorEnvironment) env; RegionServerServices services = e.getRegionServerServices(); if (services instanceof HRegionServer) { - return new CoprocessorHConnectionTableFactory(conf, (HRegionServer) services); + return new CoprocessorHConnectionTableFactory(env.getConfiguration(), (HRegionServer) services, connectionType); } } return new CoprocessorHTableFactory(env); @@ -286,44 +300,133 @@ public class ServerUtil { * https://issues.apache.org/jira/browse/HBASE-18359 */ public static class CoprocessorHConnectionTableFactory implements HTableFactory { - @GuardedBy("CoprocessorHConnectionTableFactory.this") - private HConnection connection; private final Configuration conf; private final HRegionServer server; + private final ConnectionType connectionType; - CoprocessorHConnectionTableFactory(Configuration conf, HRegionServer server) { + CoprocessorHConnectionTableFactory(Configuration conf, HRegionServer server, ConnectionType connectionType) { this.conf = conf; this.server = server; + this.connectionType = connectionType; } - private synchronized HConnection getConnection(Configuration conf) throws IOException { - if (connection == null || connection.isClosed()) { - connection = new CoprocessorHConnection(conf, server); - } - return connection; + private ClusterConnection getConnection() throws IOException { + return ConnectionFactory.getConnection(connectionType, conf, server); } @Override public HTableInterface getTable(ImmutableBytesPtr tablename) throws IOException { - return getConnection(conf).getTable(tablename.copyBytesIfNecessary()); + return getConnection().getTable(tablename.copyBytesIfNecessary()); } @Override public synchronized void shutdown() { - try { - if (connection != null && !connection.isClosed()) { - connection.close(); - } - } catch (Throwable e) { - LOG.warn("Error while trying to close the HConnection used by CoprocessorHConnectionTableFactory", e); - } + // We need not close the cached connections as they are shared across the server. } @Override public HTableInterface getTable(ImmutableBytesPtr tablename, ExecutorService pool) throws IOException { - return getConnection(conf).getTable(tablename.copyBytesIfNecessary(), pool); + return getConnection().getTable(tablename.copyBytesIfNecessary(), pool); + } + } + + public static enum ConnectionType { + COMPACTION_CONNECTION, + INDEX_WRITER_CONNECTION, + INDEX_WRITER_CONNECTION_WITH_CUSTOM_THREADS, + INDEX_WRITER_CONNECTION_WITH_CUSTOM_THREADS_NO_RETRIES, + DEFAULT_SERVER_CONNECTION; + } + + public static class ConnectionFactory { + + private static Map connections = + new ConcurrentHashMap(); + + public static ClusterConnection getConnection(final ConnectionType connectionType, final Configuration conf, final HRegionServer server) throws IOException { + ClusterConnection connection = null; + if((connection = connections.get(connectionType)) == null) { + synchronized (CoprocessorHConnectionTableFactory.class) { + if(connections.get(connectionType) == null) { + connection = new CoprocessorHConnection(conf, server); + connections.put(connectionType, connection); + return connection; + } + } + } + return connection; } + + public static Configuration getTypeSpecificConfiguration(ConnectionType connectionType, Configuration conf) { + switch (connectionType) { + case COMPACTION_CONNECTION: + return getCompactionConfig(conf); + case DEFAULT_SERVER_CONNECTION: + return conf; + case INDEX_WRITER_CONNECTION: + return getIndexWriterConnection(conf); + case INDEX_WRITER_CONNECTION_WITH_CUSTOM_THREADS: + return getIndexWriterConfigurationWithCustomThreads(conf); + case INDEX_WRITER_CONNECTION_WITH_CUSTOM_THREADS_NO_RETRIES: + return getNoRetriesIndexWriterConfigurationWithCustomThreads(conf); + default: + return conf; + } + } + } + + public static Configuration getCompactionConfig(Configuration conf) { + Configuration compactionConfig = PropertiesUtil.cloneConfig(conf); + // lower the number of rpc retries, so we don't hang the compaction + compactionConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, + conf.getInt(QueryServices.METADATA_WRITE_RETRIES_NUMBER, + QueryServicesOptions.DEFAULT_METADATA_WRITE_RETRIES_NUMBER)); + compactionConfig.setInt(HConstants.HBASE_CLIENT_PAUSE, + conf.getInt(QueryServices.METADATA_WRITE_RETRY_PAUSE, + QueryServicesOptions.DEFAULT_METADATA_WRITE_RETRY_PAUSE)); + return compactionConfig; + } + + public static Configuration getIndexWriterConnection(Configuration conf) { + Configuration clonedConfig = PropertiesUtil.cloneConfig(conf); + /* + * Set the rpc controller factory so that the HTables used by IndexWriter would + * set the correct priorities on the remote RPC calls. + */ + clonedConfig.setClass(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, + InterRegionServerIndexRpcControllerFactory.class, RpcControllerFactory.class); + // lower the number of rpc retries. We inherit config from HConnectionManager#setServerSideHConnectionRetries, + // which by default uses a multiplier of 10. That is too many retries for our synchronous index writes + clonedConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, + conf.getInt(INDEX_WRITER_RPC_RETRIES_NUMBER, + DEFAULT_INDEX_WRITER_RPC_RETRIES_NUMBER)); + clonedConfig.setInt(HConstants.HBASE_CLIENT_PAUSE, conf + .getInt(INDEX_WRITER_RPC_PAUSE, DEFAULT_INDEX_WRITER_RPC_PAUSE)); + return clonedConfig; + } + + public static Configuration getIndexWriterConfigurationWithCustomThreads(Configuration conf) { + Configuration clonedConfig = PropertiesUtil.cloneConfig(conf); + setHTableThreads(clonedConfig); + return clonedConfig; + } + + private static void setHTableThreads(Configuration conf) { + // set the number of threads allowed per table. + int htableThreads = + conf.getInt(IndexWriterUtils.INDEX_WRITER_PER_TABLE_THREADS_CONF_KEY, + IndexWriterUtils.DEFAULT_NUM_PER_TABLE_THREADS); + IndexManagementUtil.setIfNotSet(conf, IndexWriterUtils.HTABLE_THREAD_KEY, htableThreads); + } + + public static Configuration getNoRetriesIndexWriterConfigurationWithCustomThreads(Configuration conf) { + Configuration clonedConf = getIndexWriterConfigurationWithCustomThreads(conf); + // note in HBase 2+, numTries = numRetries + 1 + // in prior versions, numTries = numRetries + clonedConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); + return clonedConf; + } }