phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jamestay...@apache.org
Subject [01/22] phoenix git commit: PHOENIX-4685 Properly handle connection caching for Phoenix inside RegionServers(Rajeshbabu)
Date Tue, 22 May 2018 20:39:44 GMT
Repository: phoenix
Updated Branches:
  refs/heads/omid2 d0f98a020 -> 2015345a0


PHOENIX-4685 Properly handle connection caching for Phoenix inside RegionServers(Rajeshbabu)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/4082c73e
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/4082c73e
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/4082c73e

Branch: refs/heads/omid2
Commit: 4082c73ee23d901642d8c5bc45ececfcf5e50ede
Parents: d0f98a0
Author: Rajeshbabu Chintaguntla <rajeshbabu@apache.org>
Authored: Tue May 8 12:06:49 2018 +0530
Committer: Rajeshbabu Chintaguntla <rajeshbabu@apache.org>
Committed: Tue May 8 12:06:49 2018 +0530

----------------------------------------------------------------------
 .../DelegateRegionCoprocessorEnvironment.java   |   7 +-
 .../UngroupedAggregateRegionObserver.java       |  14 +-
 .../org/apache/phoenix/hbase/index/Indexer.java |  19 +--
 .../hbase/index/write/IndexWriterUtils.java     |  27 +---
 .../index/PhoenixTransactionalIndexer.java      |  18 +--
 .../org/apache/phoenix/util/ServerUtil.java     | 141 ++++++++++++++++---
 6 files changed, 142 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/4082c73e/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionCoprocessorEnvironment.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionCoprocessorEnvironment.java
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionCoprocessorEnvironment.java
index 284d53c..a791f4a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionCoprocessorEnvironment.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionCoprocessorEnvironment.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.regionserver.RegionServerServices;
 import org.apache.phoenix.hbase.index.table.HTableFactory;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.util.ServerUtil;
+import org.apache.phoenix.util.ServerUtil.ConnectionType;
 
 /**
  * Class to encapsulate {@link RegionCoprocessorEnvironment} for phoenix coprocessors. Often
we
@@ -44,10 +45,10 @@ public class DelegateRegionCoprocessorEnvironment implements RegionCoprocessorEn
     private RegionCoprocessorEnvironment delegate;
     private HTableFactory tableFactory;
 
-    public DelegateRegionCoprocessorEnvironment(Configuration config, RegionCoprocessorEnvironment
delegate) {
-        this.config = config;
+    public DelegateRegionCoprocessorEnvironment(RegionCoprocessorEnvironment delegate, ConnectionType
connectionType) {
+        this.config = ServerUtil.ConnectionFactory.getTypeSpecificConfiguration(connectionType,
delegate.getConfiguration());
         this.delegate = delegate;
-        this.tableFactory = ServerUtil.getDelegateHTableFactory(this, config);
+        this.tableFactory = ServerUtil.getDelegateHTableFactory(this, connectionType);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4082c73e/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 6bee65c..14213f4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -144,6 +144,7 @@ import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.ServerUtil;
+import org.apache.phoenix.util.ServerUtil.ConnectionType;
 import org.apache.phoenix.util.StringUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -225,14 +226,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         upsertSelectConfig.setClass(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY,
             InterRegionServerIndexRpcControllerFactory.class, RpcControllerFactory.class);
 
-        compactionConfig = PropertiesUtil.cloneConfig(e.getConfiguration());
-        // lower the number of rpc retries, so we don't hang the compaction
-        compactionConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
-            e.getConfiguration().getInt(QueryServices.METADATA_WRITE_RETRIES_NUMBER,
-                QueryServicesOptions.DEFAULT_METADATA_WRITE_RETRIES_NUMBER));
-        compactionConfig.setInt(HConstants.HBASE_CLIENT_PAUSE,
-            e.getConfiguration().getInt(QueryServices.METADATA_WRITE_RETRY_PAUSE,
-                QueryServicesOptions.DEFAULT_METADATA_WRITE_RETRY_PAUSE));
+        compactionConfig = ServerUtil.getCompactionConfig(e.getConfiguration());
 
         // For retries of index write failures, use the same # of retries as the rebuilder
         indexWriteConfig = PropertiesUtil.cloneConfig(e.getConfiguration());
@@ -984,7 +978,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                     InternalScanner internalScanner = scanner;
                     try {
                         long clientTimeStamp = EnvironmentEdgeManager.currentTimeMillis();
-                        DelegateRegionCoprocessorEnvironment compactionConfEnv = new DelegateRegionCoprocessorEnvironment(compactionConfig,
c.getEnvironment());
+                        DelegateRegionCoprocessorEnvironment compactionConfEnv =
+                                new DelegateRegionCoprocessorEnvironment(c.getEnvironment(),
+                                        ConnectionType.COMPACTION_CONNECTION);
                         StatisticsCollector stats = StatisticsCollectorFactory.createStatisticsCollector(
                             compactionConfEnv, table.getNameAsString(), clientTimeStamp,
                             store.getFamily().getName());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4082c73e/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
index 7325cd8..115182b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
@@ -94,6 +94,7 @@ import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
+import org.apache.phoenix.util.ServerUtil.ConnectionType;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
@@ -219,25 +220,11 @@ public class Indexer extends BaseRegionObserver {
     
         this.builder = new IndexBuildManager(env);
         // Clone the config since it is shared
-        Configuration clonedConfig = PropertiesUtil.cloneConfig(e.getConfiguration());
-        /*
-         * Set the rpc controller factory so that the HTables used by IndexWriter would
-         * set the correct priorities on the remote RPC calls.
-         */
-        clonedConfig.setClass(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY,
-                InterRegionServerIndexRpcControllerFactory.class, RpcControllerFactory.class);
-        // lower the number of rpc retries.  We inherit config from HConnectionManager#setServerSideHConnectionRetries,
-        // which by default uses a multiplier of 10.  That is too many retries for our synchronous
index writes
-        clonedConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
-            env.getConfiguration().getInt(INDEX_WRITER_RPC_RETRIES_NUMBER,
-                DEFAULT_INDEX_WRITER_RPC_RETRIES_NUMBER));
-        clonedConfig.setInt(HConstants.HBASE_CLIENT_PAUSE, env.getConfiguration()
-            .getInt(INDEX_WRITER_RPC_PAUSE, DEFAULT_INDEX_WRITER_RPC_PAUSE));
-        DelegateRegionCoprocessorEnvironment indexWriterEnv = new DelegateRegionCoprocessorEnvironment(clonedConfig,
env);
+        DelegateRegionCoprocessorEnvironment indexWriterEnv = new DelegateRegionCoprocessorEnvironment(env,
ConnectionType.INDEX_WRITER_CONNECTION);
         // setup the actual index writer
         this.writer = new IndexWriter(indexWriterEnv, serverName + "-index-writer");
         
-        this.rowLockWaitDuration = clonedConfig.getInt("hbase.rowlock.wait.duration",
+        this.rowLockWaitDuration = env.getConfiguration().getInt("hbase.rowlock.wait.duration",
                 DEFAULT_ROWLOCK_WAIT_DURATION);
         this.lockManager = new LockManager();
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4082c73e/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java
index 0d3004f..ef53b9f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java
@@ -26,6 +26,7 @@ import org.apache.phoenix.hbase.index.table.HTableFactory;
 import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ServerUtil;
+import org.apache.phoenix.util.ServerUtil.ConnectionType;
 
 public class IndexWriterUtils {
 
@@ -50,9 +51,9 @@ public class IndexWriterUtils {
    * threads. Currently, HBase doesn't support a custom thread-pool to back the HTable via
the
    * coprocesor hooks, so we can't modify this behavior.
    */
-  private static final String INDEX_WRITER_PER_TABLE_THREADS_CONF_KEY =
+  public static final String INDEX_WRITER_PER_TABLE_THREADS_CONF_KEY =
       "index.writer.threads.pertable.max";
-  private static final int DEFAULT_NUM_PER_TABLE_THREADS = Integer.MAX_VALUE;
+  public static final int DEFAULT_NUM_PER_TABLE_THREADS = Integer.MAX_VALUE;
 
   /** Configuration key that HBase uses to set the max number of threads for an HTable */
   public static final String HTABLE_THREAD_KEY = "hbase.htable.threads.max";
@@ -79,19 +80,7 @@ public class IndexWriterUtils {
   }
 
     public static HTableFactory getDefaultDelegateHTableFactory(CoprocessorEnvironment env)
{
-        // create a simple delegate factory, setup the way we need
-        Configuration conf = PropertiesUtil.cloneConfig(env.getConfiguration());
-        setHTableThreads(conf);
-        return ServerUtil.getDelegateHTableFactory(env, conf);
-    }
-
-    private static void setHTableThreads(Configuration conf) {
-        // set the number of threads allowed per table.
-        int htableThreads =
-                conf.getInt(IndexWriterUtils.INDEX_WRITER_PER_TABLE_THREADS_CONF_KEY,
-                    IndexWriterUtils.DEFAULT_NUM_PER_TABLE_THREADS);
-        LOG.trace("Creating HTableFactory with " + htableThreads + " threads for each HTable.");
-        IndexManagementUtil.setIfNotSet(conf, HTABLE_THREAD_KEY, htableThreads);
+        return ServerUtil.getDelegateHTableFactory(env, ConnectionType.INDEX_WRITER_CONNECTION_WITH_CUSTOM_THREADS);
     }
 
     /**
@@ -99,12 +88,8 @@ public class IndexWriterUtils {
      * instead to avoid tying up the handler
      */
     public static HTableFactory getNoRetriesHTableFactory(CoprocessorEnvironment env) {
-        Configuration conf = PropertiesUtil.cloneConfig(env.getConfiguration());
-        setHTableThreads(conf);
-        // note in HBase 2+, numTries = numRetries + 1
-        // in prior versions, numTries = numRetries
-        conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
-        return ServerUtil.getDelegateHTableFactory(env, conf);
+        return ServerUtil.getDelegateHTableFactory(env,
+            ConnectionType.INDEX_WRITER_CONNECTION_WITH_CUSTOM_THREADS_NO_RETRIES);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4082c73e/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index bdfcaff..02296c9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -58,6 +58,7 @@ import org.apache.phoenix.trace.util.NullSpan;
 import org.apache.phoenix.transaction.PhoenixTransactionContext;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ServerUtil;
+import org.apache.phoenix.util.ServerUtil.ConnectionType;
 import org.apache.phoenix.util.TransactionUtil;
 
 /**
@@ -94,22 +95,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
         Configuration conf = e.getConfiguration();
         String serverName = env.getRegionServerServices().getServerName().getServerName();
         codec = new PhoenixIndexCodec(conf, env.getRegion().getRegionInfo().getStartKey(),
env.getRegion().getRegionInfo().getEndKey(), env.getRegionInfo().getTable().getName());
-        // Clone the config since it is shared
-        Configuration clonedConfig = PropertiesUtil.cloneConfig(e.getConfiguration());
-        /*
-         * Set the rpc controller factory so that the HTables used by IndexWriter would
-         * set the correct priorities on the remote RPC calls.
-         */
-        clonedConfig.setClass(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY,
-                InterRegionServerIndexRpcControllerFactory.class, RpcControllerFactory.class);
-        // lower the number of rpc retries.  We inherit config from HConnectionManager#setServerSideHConnectionRetries,
-        // which by default uses a multiplier of 10.  That is too many retries for our synchronous
index writes
-        clonedConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
-            env.getConfiguration().getInt(INDEX_WRITER_RPC_RETRIES_NUMBER,
-                DEFAULT_INDEX_WRITER_RPC_RETRIES_NUMBER));
-        clonedConfig.setInt(HConstants.HBASE_CLIENT_PAUSE, env.getConfiguration()
-            .getInt(INDEX_WRITER_RPC_PAUSE, DEFAULT_INDEX_WRITER_RPC_PAUSE));
-        DelegateRegionCoprocessorEnvironment indexWriterEnv = new DelegateRegionCoprocessorEnvironment(clonedConfig,
env);
+        DelegateRegionCoprocessorEnvironment indexWriterEnv = new DelegateRegionCoprocessorEnvironment(env,
ConnectionType.INDEX_WRITER_CONNECTION);
         // setup the actual index writer
         // For transactional tables, we keep the index active upon a write failure
         // since we have the all versus none behavior for transactions. Also, we

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4082c73e/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
index 4b3cc43..2dab076 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
@@ -17,11 +17,17 @@
  */
 package org.apache.phoenix.util;
 
+import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.DEFAULT_INDEX_WRITER_RPC_PAUSE;
+import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.DEFAULT_INDEX_WRITER_RPC_RETRIES_NUMBER;
+import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.INDEX_WRITER_RPC_PAUSE;
+import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.INDEX_WRITER_RPC_RETRIES_NUMBER;
+
 import java.io.IOException;
 import java.sql.SQLException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
+import java.util.function.Function;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -35,12 +41,15 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.CoprocessorHConnection;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.HTablePool;
 import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.ipc.controller.InterRegionServerIndexRpcControllerFactory;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
@@ -52,8 +61,13 @@ import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.hbase.index.table.CoprocessorHTableFactory;
 import org.apache.phoenix.hbase.index.table.HTableFactory;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
 import org.apache.phoenix.hbase.index.util.VersionUtil;
+import org.apache.phoenix.hbase.index.write.IndexWriterUtils;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
+import org.jboss.netty.util.internal.ConcurrentHashMap;
 
 
 @SuppressWarnings("deprecation")
@@ -269,12 +283,12 @@ public class ServerUtil {
                     endKey) < 0));
     }
 
-    public static HTableFactory getDelegateHTableFactory(CoprocessorEnvironment env, Configuration
conf) {
+    public static HTableFactory getDelegateHTableFactory(CoprocessorEnvironment env, ConnectionType
connectionType) {
         if (env instanceof RegionCoprocessorEnvironment) {
             RegionCoprocessorEnvironment e = (RegionCoprocessorEnvironment) env;
             RegionServerServices services = e.getRegionServerServices();
             if (services instanceof HRegionServer) {
-                return new CoprocessorHConnectionTableFactory(conf, (HRegionServer) services);
+                return new CoprocessorHConnectionTableFactory(env.getConfiguration(), (HRegionServer)
services, connectionType);
             }
         }
         return new CoprocessorHTableFactory(env);
@@ -286,44 +300,133 @@ public class ServerUtil {
      * https://issues.apache.org/jira/browse/HBASE-18359
      */
     public static class CoprocessorHConnectionTableFactory implements HTableFactory {
-        @GuardedBy("CoprocessorHConnectionTableFactory.this")
-        private HConnection connection;
         private final Configuration conf;
         private final HRegionServer server;
+        private final ConnectionType connectionType;
 
-        CoprocessorHConnectionTableFactory(Configuration conf, HRegionServer server) {
+        CoprocessorHConnectionTableFactory(Configuration conf, HRegionServer server, ConnectionType
connectionType) {
             this.conf = conf;
             this.server = server;
+            this.connectionType = connectionType;
         }
 
-        private synchronized HConnection getConnection(Configuration conf) throws IOException
{
-            if (connection == null || connection.isClosed()) {
-                connection = new CoprocessorHConnection(conf, server);
-            }
-            return connection;
+        private ClusterConnection getConnection() throws IOException {
+            return ConnectionFactory.getConnection(connectionType, conf, server);
         }
 
         @Override
         public HTableInterface getTable(ImmutableBytesPtr tablename) throws IOException {
-            return getConnection(conf).getTable(tablename.copyBytesIfNecessary());
+            return getConnection().getTable(tablename.copyBytesIfNecessary());
         }
 
         @Override
         public synchronized void shutdown() {
-            try {
-                if (connection != null && !connection.isClosed()) {
-                    connection.close();
-                }
-            } catch (Throwable e) {
-                LOG.warn("Error while trying to close the HConnection used by CoprocessorHConnectionTableFactory",
e);
-            }
+            // We need not close the cached connections as they are shared across the server.
         }
 
         @Override
         public HTableInterface getTable(ImmutableBytesPtr tablename, ExecutorService pool)
                 throws IOException {
-            return getConnection(conf).getTable(tablename.copyBytesIfNecessary(), pool);
+            return getConnection().getTable(tablename.copyBytesIfNecessary(), pool);
+        }
+    }
+
+    public static enum ConnectionType {
+        COMPACTION_CONNECTION,
+        INDEX_WRITER_CONNECTION,
+        INDEX_WRITER_CONNECTION_WITH_CUSTOM_THREADS,
+        INDEX_WRITER_CONNECTION_WITH_CUSTOM_THREADS_NO_RETRIES,
+        DEFAULT_SERVER_CONNECTION;
+    }
+
+    public static class ConnectionFactory {
+        
+        private static Map<ConnectionType, ClusterConnection> connections =
+                new ConcurrentHashMap<ConnectionType, ClusterConnection>();
+
+        public static ClusterConnection getConnection(final ConnectionType connectionType,
final Configuration conf, final HRegionServer server) throws IOException {
+            ClusterConnection connection = null;
+            if((connection = connections.get(connectionType)) == null) {
+                synchronized (CoprocessorHConnectionTableFactory.class) {
+                    if(connections.get(connectionType) == null) {
+                        connection = new CoprocessorHConnection(conf, server);
+                        connections.put(connectionType, connection);
+                        return connection;
+                    }
+                }
+            }
+            return connection;
         }
+
+        public static Configuration getTypeSpecificConfiguration(ConnectionType connectionType,
Configuration conf) {
+            switch (connectionType) {
+            case COMPACTION_CONNECTION:
+                return getCompactionConfig(conf);
+            case DEFAULT_SERVER_CONNECTION:
+                return conf;
+            case INDEX_WRITER_CONNECTION:
+                return getIndexWriterConnection(conf);
+            case INDEX_WRITER_CONNECTION_WITH_CUSTOM_THREADS:
+                return getIndexWriterConfigurationWithCustomThreads(conf);
+            case INDEX_WRITER_CONNECTION_WITH_CUSTOM_THREADS_NO_RETRIES:
+                return getNoRetriesIndexWriterConfigurationWithCustomThreads(conf);
+            default:
+                return conf;
+            }
+        }
+    }
+
+    public static Configuration getCompactionConfig(Configuration conf) {
+        Configuration compactionConfig = PropertiesUtil.cloneConfig(conf);
+        // lower the number of rpc retries, so we don't hang the compaction
+        compactionConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+            conf.getInt(QueryServices.METADATA_WRITE_RETRIES_NUMBER,
+                QueryServicesOptions.DEFAULT_METADATA_WRITE_RETRIES_NUMBER));
+        compactionConfig.setInt(HConstants.HBASE_CLIENT_PAUSE,
+            conf.getInt(QueryServices.METADATA_WRITE_RETRY_PAUSE,
+                QueryServicesOptions.DEFAULT_METADATA_WRITE_RETRY_PAUSE));
+        return compactionConfig;
+    }
+
+    public static Configuration getIndexWriterConnection(Configuration conf) {
+        Configuration clonedConfig = PropertiesUtil.cloneConfig(conf);
+        /*
+         * Set the rpc controller factory so that the HTables used by IndexWriter would
+         * set the correct priorities on the remote RPC calls.
+         */
+        clonedConfig.setClass(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY,
+                InterRegionServerIndexRpcControllerFactory.class, RpcControllerFactory.class);
+        // lower the number of rpc retries.  We inherit config from HConnectionManager#setServerSideHConnectionRetries,
+        // which by default uses a multiplier of 10.  That is too many retries for our synchronous
index writes
+        clonedConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+            conf.getInt(INDEX_WRITER_RPC_RETRIES_NUMBER,
+                DEFAULT_INDEX_WRITER_RPC_RETRIES_NUMBER));
+        clonedConfig.setInt(HConstants.HBASE_CLIENT_PAUSE, conf
+            .getInt(INDEX_WRITER_RPC_PAUSE, DEFAULT_INDEX_WRITER_RPC_PAUSE));
+        return clonedConfig;
+    }
+
+    public static Configuration getIndexWriterConfigurationWithCustomThreads(Configuration
conf) {
+        Configuration clonedConfig = PropertiesUtil.cloneConfig(conf);
+        setHTableThreads(clonedConfig);
+        return clonedConfig;
+    }
+
+    private static void setHTableThreads(Configuration conf) {
+        // set the number of threads allowed per table.
+        int htableThreads =
+                conf.getInt(IndexWriterUtils.INDEX_WRITER_PER_TABLE_THREADS_CONF_KEY,
+                    IndexWriterUtils.DEFAULT_NUM_PER_TABLE_THREADS);
+        IndexManagementUtil.setIfNotSet(conf, IndexWriterUtils.HTABLE_THREAD_KEY, htableThreads);
+    }
+    
+    public static Configuration getNoRetriesIndexWriterConfigurationWithCustomThreads(Configuration
conf) {
+        Configuration clonedConf = getIndexWriterConfigurationWithCustomThreads(conf);
+        // note in HBase 2+, numTries = numRetries + 1
+        // in prior versions, numTries = numRetries
+        clonedConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
+        return clonedConf;
+
     }
 
 }


Mime
View raw message