phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sama...@apache.org
Subject phoenix git commit: PHOENIX-3994 Index RPC priority still depends on the controller factory property in hbase-site.xml
Date Tue, 11 Jul 2017 20:29:05 GMT
Repository: phoenix
Updated Branches:
  refs/heads/4.11-HBase-1.2 bc6370288 -> 3dfcbb9c6


PHOENIX-3994 Index RPC priority still depends on the controller factory property in hbase-site.xml


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/3dfcbb9c
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/3dfcbb9c
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/3dfcbb9c

Branch: refs/heads/4.11-HBase-1.2
Commit: 3dfcbb9c63f120b2aa21286bb7f4bb520f19402e
Parents: bc63702
Author: Samarth Jain <samarth@apache.org>
Authored: Tue Jul 11 13:28:59 2017 -0700
Committer: Samarth Jain <samarth@apache.org>
Committed: Tue Jul 11 13:28:59 2017 -0700

----------------------------------------------------------------------
 .../apache/phoenix/rpc/PhoenixServerRpcIT.java  |  7 +-
 .../hbase/index/write/IndexWriterUtils.java     | 86 +++++++++++++++++---
 .../write/ParallelWriterIndexCommitter.java     |  4 +-
 .../java/org/apache/phoenix/query/BaseTest.java |  3 -
 4 files changed, 80 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/3dfcbb9c/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java b/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java
index b9e4fff..6119548 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java
@@ -28,6 +28,7 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -68,8 +69,7 @@ public class PhoenixServerRpcIT extends BaseUniqueNamesOwnClusterIT {
     	Map<String, String> serverProps = Collections.singletonMap(RSRpcServices.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,

         		TestPhoenixIndexRpcSchedulerFactory.class.getName());
         // use the standard rpc controller for client rpc, so that we can isolate server
rpc and ensure they use the correct queue  
-    	Map<String, String> clientProps = Collections.singletonMap(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY,
-    			RpcControllerFactory.class.getName());
+        Map<String, String> clientProps = Collections.emptyMap();
         NUM_SLAVES_BASE = 2;
         setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
     }
@@ -143,9 +143,6 @@ public class PhoenixServerRpcIT extends BaseUniqueNamesOwnClusterIT {
             Mockito.verify(TestPhoenixIndexRpcSchedulerFactory.getIndexRpcExecutor()).dispatch(Mockito.any(CallRunner.class));
             
             TestPhoenixIndexRpcSchedulerFactory.reset();
-            createIndex(conn, indexName + "_1");
-            // verify that that index queue is used and only once (during Upsert Select on
server to build the index)
-            Mockito.verify(TestPhoenixIndexRpcSchedulerFactory.getIndexRpcExecutor()).dispatch(Mockito.any(CallRunner.class));
         }
         finally {
             conn.close();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3dfcbb9c/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java
index 6eb657d..703f35c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java
@@ -17,12 +17,24 @@
  */
 package org.apache.phoenix.hbase.index.write;
 
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+
+import javax.annotation.concurrent.GuardedBy;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.client.CoprocessorHConnection;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
 import org.apache.phoenix.hbase.index.table.CoprocessorHTableFactory;
 import org.apache.phoenix.hbase.index.table.HTableFactory;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
 
 public class IndexWriterUtils {
@@ -61,14 +73,68 @@ public class IndexWriterUtils {
     // private ctor for utilites
   }
 
-  public static HTableFactory getDefaultDelegateHTableFactory(CoprocessorEnvironment env)
{
-    // create a simple delegate factory, setup the way we need
-    Configuration conf = env.getConfiguration();
-    // set the number of threads allowed per table.
-    int htableThreads =
-        conf.getInt(IndexWriterUtils.INDEX_WRITER_PER_TABLE_THREADS_CONF_KEY, IndexWriterUtils.DEFAULT_NUM_PER_TABLE_THREADS);
-    LOG.trace("Creating HTableFactory with " + htableThreads + " threads for each HTable.");
-    IndexManagementUtil.setIfNotSet(conf, HTABLE_THREAD_KEY, htableThreads);
-    return new CoprocessorHTableFactory(env);
-  }
+    public static HTableFactory getDefaultDelegateHTableFactory(CoprocessorEnvironment env)
{
+        // create a simple delegate factory, setup the way we need
+        Configuration conf = env.getConfiguration();
+        // set the number of threads allowed per table.
+        int htableThreads =
+                conf.getInt(IndexWriterUtils.INDEX_WRITER_PER_TABLE_THREADS_CONF_KEY,
+                    IndexWriterUtils.DEFAULT_NUM_PER_TABLE_THREADS);
+        LOG.trace("Creating HTableFactory with " + htableThreads + " threads for each HTable.");
+        IndexManagementUtil.setIfNotSet(conf, HTABLE_THREAD_KEY, htableThreads);
+        if (env instanceof RegionCoprocessorEnvironment) {
+            RegionCoprocessorEnvironment e = (RegionCoprocessorEnvironment) env;
+            RegionServerServices services = e.getRegionServerServices();
+            if (services instanceof HRegionServer) {
+                return new SimpleTableFactory(conf, (HRegionServer) services);
+            }
+        } else {
+            return new CoprocessorHTableFactory(env);
+        }
+        throw new IllegalStateException("Unexpected environment or settings!");
+    }
+
+    /**
+     * {@code HTableFactory} that creates HTables by using a {@link CoprocessorHConnection}
This
+     * factory was added as a workaround to the bug reported in
+     * https://issues.apache.org/jira/browse/HBASE-18359
+     */
+    private static class SimpleTableFactory implements HTableFactory {
+        @GuardedBy("SimpleTableFactory.this")
+        private HConnection connection;
+        private final Configuration conf;
+        private final HRegionServer server;
+
+        SimpleTableFactory(Configuration conf, HRegionServer server) {
+            this.conf = conf;
+            this.server = server;
+        }
+
+        private synchronized HConnection getConnection(Configuration conf) throws IOException
{
+            if (connection == null || connection.isClosed()) {
+                connection = new CoprocessorHConnection(conf, server);
+            }
+            return connection;
+        }
+
+        @Override
+        public HTableInterface getTable(ImmutableBytesPtr tablename) throws IOException {
+            return getConnection(conf).getTable(tablename.copyBytesIfNecessary());
+        }
+
+        @Override
+        public void shutdown() {
+            try {
+                getConnection(conf).close();
+            } catch (IOException e) {
+                LOG.error("Exception caught while trying to close the HConnection used by
SimpleTableFactory");
+            }
+        }
+
+        @Override
+        public HTableInterface getTable(ImmutableBytesPtr tablename, ExecutorService pool)
+                throws IOException {
+            return getConnection(conf).getTable(tablename.copyBytesIfNecessary(), pool);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3dfcbb9c/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
index 7510c5b..a537010 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
@@ -158,8 +158,8 @@ public class ParallelWriterIndexCommitter implements IndexCommitter {
                                 return null;
                             } catch (IOException ignord) {
                                 // when it's failed we fall back to the standard & slow
way
-                                if (LOG.isTraceEnabled()) {
-                                    LOG.trace("indexRegion.batchMutate failed and fall back
to HTable.batch(). Got error="
+                                if (LOG.isDebugEnabled()) {
+                                    LOG.debug("indexRegion.batchMutate failed and fall back
to HTable.batch(). Got error="
                                             + ignord);
                                 }
                             }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3dfcbb9c/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index 078c1e8..c0b6cac 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -119,7 +119,6 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.ipc.controller.ServerRpcControllerFactory;
 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -634,9 +633,7 @@ public abstract class BaseTest {
         //no point doing sanity checks when running tests.
         conf.setBoolean("hbase.table.sanity.checks", false);
         // set the server rpc controller and rpc scheduler factory, used to configure the
cluster
-        conf.set(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, DEFAULT_SERVER_RPC_CONTROLLER_FACTORY);
         conf.set(RSRpcServices.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, DEFAULT_RPC_SCHEDULER_FACTORY);
-        
         // override any defaults based on overrideProps
         for (Entry<String,String> entry : overrideProps) {
             conf.set(entry.getKey(), entry.getValue());


Mime
View raw message