Return-Path: X-Original-To: apmail-phoenix-commits-archive@minotaur.apache.org Delivered-To: apmail-phoenix-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 61DF1110BC for ; Wed, 23 Jul 2014 18:12:38 +0000 (UTC) Received: (qmail 48639 invoked by uid 500); 23 Jul 2014 18:12:38 -0000 Delivered-To: apmail-phoenix-commits-archive@phoenix.apache.org Received: (qmail 48597 invoked by uid 500); 23 Jul 2014 18:12:38 -0000 Mailing-List: contact commits-help@phoenix.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@phoenix.apache.org Delivered-To: mailing list commits@phoenix.apache.org Received: (qmail 48571 invoked by uid 99); 23 Jul 2014 18:12:38 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 23 Jul 2014 18:12:38 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id DE23B9B19DD; Wed, 23 Jul 2014 18:12:37 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jyates@apache.org To: commits@phoenix.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: git commit: PHOENIX-938: Use higher priority queue for index updates to prevent deadlock Date: Wed, 23 Jul 2014 18:12:37 +0000 (UTC) Repository: phoenix Updated Branches: refs/heads/master 36a41c86a -> 1954c717a PHOENIX-938: Use higher priority queue for index updates to prevent deadlock Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/1954c717 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/1954c717 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/1954c717 Branch: refs/heads/master Commit: 1954c717a12561bdc2184ba23c53afae3f900084 Parents: 36a41c8 Author: Jesse Yates Authored: Wed Jul 23 11:10:52 2014 -0700 Committer: Jesse Yates Committed: Wed Jul 23 11:10:52 2014 -0700 ---------------------------------------------------------------------- .../phoenix/end2end/index/IndexHandlerIT.java | 166 +++++++++++++++++++ .../hbase/ipc/PhoenixIndexRpcScheduler.java | 125 ++++++++++++++ .../phoenix/hbase/index/IndexQosCompat.java | 98 +++++++++++ .../index/IndexQosRpcControllerFactory.java | 86 ++++++++++ .../hbase/index/builder/BaseIndexBuilder.java | 2 - .../hbase/index/builder/IndexBuildManager.java | 1 - .../hbase/index/builder/IndexBuilder.java | 1 - .../ipc/PhoenixIndexRpcSchedulerFactory.java | 104 ++++++++++++ .../index/table/CoprocessorHTableFactory.java | 65 ++++---- .../java/org/apache/phoenix/util/IndexUtil.java | 6 + .../hbase/ipc/PhoenixIndexRpcSchedulerTest.java | 99 +++++++++++ .../PhoenixIndexRpcSchedulerFactoryTest.java | 105 ++++++++++++ pom.xml | 4 +- 13 files changed, 827 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/1954c717/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexHandlerIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexHandlerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexHandlerIT.java new file mode 100644 index 0000000..a829ae1 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexHandlerIT.java @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end.index; + +import static org.junit.Assert.assertEquals; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CellScannable; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.ipc.DelegatingPayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.hbase.index.IndexQosRpcControllerFactory; +import org.apache.phoenix.hbase.index.TableName; +import org.apache.phoenix.hbase.index.ipc.PhoenixIndexRpcSchedulerFactory; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +/** + * Comprehensive test that ensures we are adding custom index handlers + */ +public class IndexHandlerIT { + + public static class CountingIndexClientRpcFactory extends RpcControllerFactory { + + private IndexQosRpcControllerFactory delegate; + + public CountingIndexClientRpcFactory(Configuration conf) { + super(conf); + this.delegate = new IndexQosRpcControllerFactory(conf); + } + + @Override + public PayloadCarryingRpcController newController() { + PayloadCarryingRpcController controller = delegate.newController(); + return new CountingIndexClientRpcController(controller); + } + + @Override + public PayloadCarryingRpcController newController(CellScanner cellScanner) { + PayloadCarryingRpcController controller = delegate.newController(cellScanner); + return new CountingIndexClientRpcController(controller); + } + + @Override + public PayloadCarryingRpcController newController(List cellIterables) { + PayloadCarryingRpcController controller = delegate.newController(cellIterables); + return new CountingIndexClientRpcController(controller); + } + } + + public static class CountingIndexClientRpcController extends + DelegatingPayloadCarryingRpcController { + + private static Map priorityCounts = new HashMap(); + + public CountingIndexClientRpcController(PayloadCarryingRpcController delegate) { + super(delegate); + } + + @Override + public void setPriority(int pri) { + Integer count = priorityCounts.get(pri); + if (count == 0) { + count = new Integer(0); + } + count = count.intValue() + 1; + priorityCounts.put(pri, count); + + } + } + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static final byte[] row = Bytes.toBytes("row"); + private static final byte[] family = Bytes.toBytes("FAM"); + private static final byte[] qual = Bytes.toBytes("qual"); + private static final HColumnDescriptor FAM1 = new HColumnDescriptor(family); + + @Rule + public TableName TestTable = new TableName(); + + @BeforeClass + public static void setupCluster() throws Exception { + UTIL.startMiniCluster(); + } + + @AfterClass + public static void shutdownCluster() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @Before + public void setup() throws Exception { + HTableDescriptor desc = + new HTableDescriptor(org.apache.hadoop.hbase.TableName.valueOf(TestTable + .getTableNameString())); + desc.addFamily(FAM1); + + // create the table + HBaseAdmin admin = UTIL.getHBaseAdmin(); + admin.createTable(desc); + } + + @After + public void cleanup() throws Exception { + HBaseAdmin admin = UTIL.getHBaseAdmin(); + admin.disableTable(TestTable.getTableName()); + admin.deleteTable(TestTable.getTableName()); + } + + @Test + public void testClientWritesWithPriority() throws Exception { + Configuration conf = new Configuration(UTIL.getConfiguration()); + // add the keys for our rpc factory + conf.set(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, + CountingIndexClientRpcFactory.class.getName()); + // and set the index table as the current table + conf.setStrings(IndexQosRpcControllerFactory.INDEX_TABLE_NAMES_KEY, + TestTable.getTableNameString()); + HTable table = new HTable(conf, TestTable.getTableName()); + + // do a write to the table + Put p = new Put(row); + p.add(family, qual, new byte[] { 1, 0, 1, 0 }); + table.put(p); + table.flushCommits(); + + // check the counts on the rpc controller + assertEquals("Didn't get the expected number of index priority writes!", (int) 1, + (int) CountingIndexClientRpcController.priorityCounts + .get(PhoenixIndexRpcSchedulerFactory.DEFAULT_INDEX_MIN_PRIORITY)); + + table.close(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/1954c717/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcScheduler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcScheduler.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcScheduler.java new file mode 100644 index 0000000..8349321 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcScheduler.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ipc.BalancedQueueRpcExecutor; +import org.apache.hadoop.hbase.ipc.CallRunner; +import org.apache.hadoop.hbase.ipc.RpcExecutor; +import org.apache.hadoop.hbase.ipc.RpcScheduler; +import org.apache.hadoop.hbase.ipc.RpcServer; + +import com.google.common.annotations.VisibleForTesting; + +/** + * {@link RpcScheduler} that first checks to see if this is an index update before passing off the + * call to the delegate {@link RpcScheduler}. + *

+ * We reserve the range (1000, 1050], by default (though it is configurable), for index priority + * writes. Currently, we don't do any prioritization within that range - all index writes are + * treated with the same priority and put into the same queue. + */ +public class PhoenixIndexRpcScheduler extends RpcScheduler { + + // copied from org.apache.hadoop.hbase.ipc.SimpleRpcScheduler in HBase 0.98.4 + public static final String CALL_QUEUE_READ_SHARE_CONF_KEY = "ipc.server.callqueue.read.share"; + public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY = + "ipc.server.callqueue.handler.factor"; + private static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10; + + private RpcScheduler delegate; + private int minPriority; + private int maxPriority; + private RpcExecutor callExecutor; + + public PhoenixIndexRpcScheduler(int indexHandlerCount, Configuration conf, + RpcScheduler delegate, int minPriority, int maxPriority) { + int maxQueueLength = + conf.getInt("ipc.server.max.callqueue.length", indexHandlerCount + * DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); + + // copied from org.apache.hadoop.hbase.ipc.SimpleRpcScheduler in HBase 0.98.4 + float callQueuesHandlersFactor = conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0); + int numCallQueues = + Math.max(1, (int) Math.round(indexHandlerCount * callQueuesHandlersFactor)); + + this.minPriority = minPriority; + this.maxPriority = maxPriority; + this.delegate = delegate; + + this.callExecutor = + new BalancedQueueRpcExecutor("Index", indexHandlerCount, numCallQueues, + maxQueueLength); + } + + @Override + public void init(Context context) { + delegate.init(context); + } + + @Override + public void start() { + delegate.start(); + } + + @Override + public void stop() { + delegate.stop(); + callExecutor.stop(); + } + + @Override + public void dispatch(CallRunner callTask) throws InterruptedException, IOException { + RpcServer.Call call = callTask.getCall(); + int priority = call.header.getPriority(); + if (minPriority <= priority && priority < maxPriority) { + callExecutor.dispatch(callTask); + } else { + delegate.dispatch(callTask); + } + } + + @Override + public int getGeneralQueueLength() { + // not the best way to calculate, but don't have a better way to hook + // into metrics at the moment + return this.delegate.getGeneralQueueLength() + this.callExecutor.getQueueLength(); + } + + @Override + public int getPriorityQueueLength() { + return this.delegate.getPriorityQueueLength(); + } + + @Override + public int getReplicationQueueLength() { + return this.delegate.getReplicationQueueLength(); + } + + @Override + public int getActiveRpcHandlerCount() { + return this.delegate.getActiveRpcHandlerCount() + this.callExecutor.getActiveHandlerCount(); + } + + @VisibleForTesting + public void setExecutorForTesting(RpcExecutor executor) { + this.callExecutor = executor; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/1954c717/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexQosCompat.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexQosCompat.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexQosCompat.java new file mode 100644 index 0000000..5681d71 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexQosCompat.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.hbase.index; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.phoenix.hbase.index.ipc.PhoenixIndexRpcSchedulerFactory; + +/** + * Helper class to avoid loading HBase 0.98.3+ classes in older HBase installations + */ +public class IndexQosCompat { + + private static final Log LOG = LogFactory.getLog(IndexQosCompat.class); + + /** + * Full class name of the RpcControllerFactory. This is copied here so we don't need the static reference, so we can work with older versions of HBase 0.98, which don't have this class + */ + private static final String HBASE_RPC_CONTROLLER_CLASS_NAME = + "org.apache.hadoop.hbase.ipc.RpcControllerFactory"; + private static volatile boolean checked = false; + private static boolean rpcControllerExists = false; + + private IndexQosCompat() { + // private ctor for util class + } + + /** + * @param tableName name of the index table + * @return configuration key for if a table should have Index QOS writes (its a target index + * table) + */ + public static String getTableIndexQosConfKey(String tableName) { + return "phoenix.index.table.qos._" + tableName; + } + + /** + * Set the index rpc controller, if the rpc controller exists. No-op if there the RpcController + * is not on the classpath. + * @param conf to update + */ + public static void setPhoenixIndexRpcController(Configuration conf) { + if (rpcControllerExists()) { + // then we can load the class just fine + conf.set(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, + PhoenixIndexRpcSchedulerFactory.class.getName()); + } + } + + private static boolean rpcControllerExists() { + if (checked) { + synchronized (IndexQosCompat.class) { + if (!checked) { + // try loading the class + try { + Class.forName(HBASE_RPC_CONTROLLER_CLASS_NAME); + rpcControllerExists = true; + } catch (ClassNotFoundException e) { + LOG.warn("RpcControllerFactory doesn't exist, not setting custom index handler properties."); + rpcControllerExists = false; + } + + checked = true; + } + } + } + return rpcControllerExists; + } + + /** + * Ensure that the given table is enabled for index QOS handling + * @param conf configuration to read/update + * @param tableName name of the table to configure for index handlers + */ + public static void enableIndexQosForTable(Configuration conf, String tableName) { + String confKey = IndexQosCompat.getTableIndexQosConfKey(tableName); + if (conf.get(confKey) == null) { + conf.setBoolean(confKey, true); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/1954c717/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexQosRpcControllerFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexQosRpcControllerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexQosRpcControllerFactory.java new file mode 100644 index 0000000..aa8b8d1 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexQosRpcControllerFactory.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.hbase.index; + +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CellScannable; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.ipc.DelegatingPayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.phoenix.hbase.index.ipc.PhoenixIndexRpcSchedulerFactory; + +/** + * {@link RpcControllerFactory} that overrides the standard {@link PayloadCarryingRpcController} to + * allow the configured index tables (via {@link #INDEX_TABLE_NAMES_KEY}) to use the Index priority. + */ +public class IndexQosRpcControllerFactory extends RpcControllerFactory { + + public static final String INDEX_TABLE_NAMES_KEY = "phoenix.index.rpc.controller.index-tables"; + + public IndexQosRpcControllerFactory(Configuration conf) { + super(conf); + } + + @Override + public PayloadCarryingRpcController newController() { + PayloadCarryingRpcController delegate = super.newController(); + return new IndexQosRpcController(delegate, conf); + } + + @Override + public PayloadCarryingRpcController newController(CellScanner cellScanner) { + PayloadCarryingRpcController delegate = super.newController(cellScanner); + return new IndexQosRpcController(delegate, conf); + } + + @Override + public PayloadCarryingRpcController newController(List cellIterables) { + PayloadCarryingRpcController delegate = super.newController(cellIterables); + return new IndexQosRpcController(delegate, conf); + } + + private class IndexQosRpcController extends DelegatingPayloadCarryingRpcController { + + private Configuration conf; + private int priority; + + public IndexQosRpcController(PayloadCarryingRpcController delegate, Configuration conf) { + super(delegate); + this.conf = conf; + this.priority = PhoenixIndexRpcSchedulerFactory.getMinPriority(conf); + } + + @Override + public void setPriority(final TableName tn) { + // if its an index table, then we override to the index priority + if (isIndexTable(tn)) { + setPriority(this.priority); + } else { + super.setPriority(tn); + } + } + + private boolean isIndexTable(TableName tn) { + return conf.get(IndexQosCompat.getTableIndexQosConfKey(tn.getNameAsString())) == null; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/1954c717/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java index c91c511..f9df296 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java @@ -24,8 +24,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; -import org.apache.hadoop.hbase.util.Pair; - import org.apache.phoenix.hbase.index.covered.CoveredColumnsIndexBuilder; /** http://git-wip-us.apache.org/repos/asf/phoenix/blob/1954c717/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java index 45e5495..ba9534c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java @@ -34,7 +34,6 @@ import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.hadoop.hbase.util.Pair; - import org.apache.phoenix.hbase.index.Indexer; import org.apache.phoenix.hbase.index.parallel.QuickFailingTaskRunner; import org.apache.phoenix.hbase.index.parallel.Task; http://git-wip-us.apache.org/repos/asf/phoenix/blob/1954c717/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java index aa2225b..1c9782e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java @@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.hadoop.hbase.util.Pair; - import org.apache.phoenix.hbase.index.Indexer; /** http://git-wip-us.apache.org/repos/asf/phoenix/blob/1954c717/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ipc/PhoenixIndexRpcSchedulerFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ipc/PhoenixIndexRpcSchedulerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ipc/PhoenixIndexRpcSchedulerFactory.java new file mode 100644 index 0000000..500db7c --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ipc/PhoenixIndexRpcSchedulerFactory.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.hbase.index.ipc; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ipc.PhoenixIndexRpcScheduler; +import org.apache.hadoop.hbase.ipc.RpcScheduler; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.regionserver.RpcSchedulerFactory; +import org.apache.hadoop.hbase.regionserver.SimpleRpcSchedulerFactory; + +import com.google.common.base.Preconditions; + +/** + * Factory to create a {@link PhoenixIndexRpcScheduler}. In this package so we can access the + * {@link SimpleRpcSchedulerFactory}. + */ +public class PhoenixIndexRpcSchedulerFactory implements RpcSchedulerFactory { + + private static final Log LOG = LogFactory.getLog(PhoenixIndexRpcSchedulerFactory.class); + + private static final String INDEX_HANDLER_COUNT_KEY = + "org.apache.phoenix.regionserver.index.handler.count"; + private static final int DEFAULT_INDEX_HANDLER_COUNT = 30; + + /** + * HConstants#HIGH_QOS is the max we will see to a standard table. We go higher to differentiate + * and give some room for things in the middle + */ + public static final int DEFAULT_INDEX_MIN_PRIORITY = 1000; + public static final int DEFAULT_INDEX_MAX_PRIORITY = 1050; + public static final String MIN_INDEX_PRIOIRTY_KEY = + "org.apache.phoenix.regionserver.index.priority.min"; + public static final String MAX_INDEX_PRIOIRTY_KEY = + "org.apache.phoenix.regionserver.index.priority.max"; + + private static final String VERSION_TOO_OLD_FOR_INDEX_RPC = + "Running an older version of HBase (less than 0.98.4), Phoenix index RPC handling cannot be enabled."; + + @Override + public RpcScheduler create(Configuration conf, RegionServerServices services) { + // create the delegate scheduler + RpcScheduler delegate; + try { + // happens in <=0.98.4 where the scheduler factory is not visible + delegate = new SimpleRpcSchedulerFactory().create(conf, services); + } catch (IllegalAccessError e) { + LOG.fatal(VERSION_TOO_OLD_FOR_INDEX_RPC); + throw e; + } + try { + // make sure we are on a version that phoenix can support + Class.forName("org.apache.hadoop.hbase.ipc.RpcExecutor"); + } catch (ClassNotFoundException e) { + LOG.error(VERSION_TOO_OLD_FOR_INDEX_RPC + + " Instead, using falling back to Simple RPC scheduling."); + return delegate; + } + + int indexHandlerCount = conf.getInt(INDEX_HANDLER_COUNT_KEY, DEFAULT_INDEX_HANDLER_COUNT); + int minPriority = getMinPriority(conf); + int maxPriority = conf.getInt(MAX_INDEX_PRIOIRTY_KEY, DEFAULT_INDEX_MAX_PRIORITY); + // make sure the ranges are outside the warning ranges + Preconditions.checkArgument(maxPriority > minPriority, "Max index priority (" + maxPriority + + ") must be larger than min priority (" + minPriority + ")"); + boolean allSmaller = + minPriority < HConstants.REPLICATION_QOS + && maxPriority < HConstants.REPLICATION_QOS; + boolean allLarger = minPriority > HConstants.HIGH_QOS; + Preconditions.checkArgument(allSmaller || allLarger, "Index priority range (" + minPriority + + ", " + maxPriority + ") must be outside HBase priority range (" + + HConstants.REPLICATION_QOS + ", " + HConstants.HIGH_QOS + ")"); + + LOG.info("Using custom Phoenix Index RPC Handling with " + indexHandlerCount + + " handlers and priority range [" + minPriority + ", " + maxPriority + ")"); + + PhoenixIndexRpcScheduler scheduler = + new PhoenixIndexRpcScheduler(indexHandlerCount, conf, delegate, minPriority, + maxPriority); + return scheduler; + } + + public static int getMinPriority(Configuration conf) { + return conf.getInt(MIN_INDEX_PRIOIRTY_KEY, DEFAULT_INDEX_MIN_PRIORITY); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/1954c717/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java index 8ef3e4f..907eb3d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.phoenix.hbase.index.table; import java.io.IOException; @@ -28,42 +27,50 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.util.Bytes; - +import org.apache.phoenix.hbase.index.IndexQosCompat; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.util.IndexManagementUtil; public class CoprocessorHTableFactory implements HTableFactory { - /** Number of milliseconds per-interval to retry zookeeper */ - private static final String ZOOKEEPER_RECOVERY_RETRY_INTERVALMILL = "zookeeper.recovery.retry.intervalmill"; - /** Number of retries for zookeeper */ - private static final String ZOOKEEPER_RECOVERY_RETRY_KEY = "zookeeper.recovery.retry"; - private static final Log LOG = LogFactory.getLog(CoprocessorHTableFactory.class); - private CoprocessorEnvironment e; + /** Number of milliseconds per-interval to retry zookeeper */ + private static final String ZOOKEEPER_RECOVERY_RETRY_INTERVALMILL = + "zookeeper.recovery.retry.intervalmill"; + /** Number of retries for zookeeper */ + private static final String ZOOKEEPER_RECOVERY_RETRY_KEY = "zookeeper.recovery.retry"; + private static final Log LOG = LogFactory.getLog(CoprocessorHTableFactory.class); + private CoprocessorEnvironment e; + + public CoprocessorHTableFactory(CoprocessorEnvironment e) { + this.e = e; + } + + @Override + public HTableInterface getTable(ImmutableBytesPtr tablename) throws IOException { + Configuration conf = e.getConfiguration(); + // make sure writers fail fast + IndexManagementUtil.setIfNotSet(conf, HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3); + IndexManagementUtil.setIfNotSet(conf, HConstants.HBASE_CLIENT_PAUSE, 1000); + IndexManagementUtil.setIfNotSet(conf, ZOOKEEPER_RECOVERY_RETRY_KEY, 3); + IndexManagementUtil.setIfNotSet(conf, ZOOKEEPER_RECOVERY_RETRY_INTERVALMILL, 100); + IndexManagementUtil.setIfNotSet(conf, HConstants.ZK_SESSION_TIMEOUT, 30000); + IndexManagementUtil.setIfNotSet(conf, HConstants.HBASE_RPC_TIMEOUT_KEY, 5000); - public CoprocessorHTableFactory(CoprocessorEnvironment e) { - this.e = e; - } + // make sure we use the index priority writer for our rpcs + IndexQosCompat.setPhoenixIndexRpcController(conf); - @Override - public HTableInterface getTable(ImmutableBytesPtr tablename) throws IOException { - Configuration conf = e.getConfiguration(); - // make sure writers fail fast - IndexManagementUtil.setIfNotSet(conf, HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3); - IndexManagementUtil.setIfNotSet(conf, HConstants.HBASE_CLIENT_PAUSE, 1000); - IndexManagementUtil.setIfNotSet(conf, ZOOKEEPER_RECOVERY_RETRY_KEY, 3); - IndexManagementUtil.setIfNotSet(conf, ZOOKEEPER_RECOVERY_RETRY_INTERVALMILL, 100); - IndexManagementUtil.setIfNotSet(conf, HConstants.ZK_SESSION_TIMEOUT, 30000); - IndexManagementUtil.setIfNotSet(conf, HConstants.HBASE_RPC_TIMEOUT_KEY, 5000); + // make sure we include the index table in the tables we need to track + String tableName = Bytes.toString(tablename.copyBytesIfNecessary()); + IndexQosCompat.enableIndexQosForTable(conf, tableName); - if (LOG.isDebugEnabled()) { - LOG.debug("Creating new HTable: " + Bytes.toString(tablename.copyBytesIfNecessary())); + if (LOG.isDebugEnabled()) { + LOG.debug("Creating new HTable: " + tableName); + } + return this.e.getTable(TableName.valueOf(tablename.copyBytesIfNecessary())); } - return this.e.getTable(TableName.valueOf(tablename.copyBytesIfNecessary())); - } - @Override - public void shutdown() { - // noop - } + @Override + public void shutdown() { + // noop + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/1954c717/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java index d4f8207..9b6fc4f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java @@ -494,6 +494,12 @@ public class IndexUtil { public byte[] getRow() { return cell.getRow(); } + + @Override + @Deprecated + public int getTagsLengthUnsigned() { + return cell.getTagsLengthUnsigned(); + } }; // Wrap cell in cell that offsets row key result.set(i, newCell); http://git-wip-us.apache.org/repos/asf/phoenix/blob/1954c717/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java b/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java new file mode 100644 index 0000000..ec18d9b --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import static org.junit.Assert.assertEquals; + +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.ipc.RpcScheduler.Context; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; +import org.junit.Test; +import org.mockito.Mockito; + +/** + * Test that the rpc scheduler schedules index writes to the index handler queue and sends + * everything else to the standard queues + */ +public class PhoenixIndexRpcSchedulerTest { + + private static final Configuration conf = HBaseConfiguration.create(); + + @Test + public void testIndexPriorityWritesToIndexHandler() throws Exception { + RpcScheduler mock = Mockito.mock(RpcScheduler.class); + + PhoenixIndexRpcScheduler scheduler = new PhoenixIndexRpcScheduler(10, conf, mock, 200, 250); + BalancedQueueRpcExecutor executor = new BalancedQueueRpcExecutor("test-queue", 1, 1, 1); + scheduler.setExecutorForTesting(executor); + dispatchCallWithPriority(scheduler, 200); + List> queues = executor.getQueues(); + assertEquals(1, queues.size()); + BlockingQueue queue = queues.get(0); + queue.poll(20, TimeUnit.SECONDS); + + // try again, this time we tweak the ranges we support + scheduler = new PhoenixIndexRpcScheduler(10, conf, mock, 101, 110); + scheduler.setExecutorForTesting(executor); + dispatchCallWithPriority(scheduler, 101); + queue.poll(20, TimeUnit.SECONDS); + + Mockito.verify(mock, Mockito.times(2)).init(Mockito.any(Context.class)); + Mockito.verifyNoMoreInteractions(mock); + } + + /** + * Test that we delegate to the passed {@link RpcScheduler} when the call priority is outside + * the index range + * @throws Exception + */ + @Test + public void testDelegateWhenOutsideRange() throws Exception { + RpcScheduler mock = Mockito.mock(RpcScheduler.class); + PhoenixIndexRpcScheduler scheduler = new PhoenixIndexRpcScheduler(10, conf, mock, 200, 250); + dispatchCallWithPriority(scheduler, 100); + dispatchCallWithPriority(scheduler, 250); + + // try again, this time we tweak the ranges we support + scheduler = new PhoenixIndexRpcScheduler(10, conf, mock, 101, 110); + dispatchCallWithPriority(scheduler, 200); + dispatchCallWithPriority(scheduler, 110); + + Mockito.verify(mock, Mockito.times(4)).init(Mockito.any(Context.class)); + Mockito.verify(mock, Mockito.times(4)).dispatch(Mockito.any(CallRunner.class)); + Mockito.verifyNoMoreInteractions(mock); + } + + private void dispatchCallWithPriority(RpcScheduler scheduler, int priority) throws Exception { + CallRunner task = Mockito.mock(CallRunner.class); + RequestHeader header = RequestHeader.newBuilder().setPriority(priority).build(); + RpcServer server = new RpcServer(null, "test-rpcserver", null, null, conf, scheduler); + RpcServer.Call call = + server.new Call(0, null, null, header, null, null, null, null, 10, null); + Mockito.when(task.getCall()).thenReturn(call); + + scheduler.dispatch(task); + + Mockito.verify(task).getCall(); + Mockito.verifyNoMoreInteractions(task); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/1954c717/phoenix-core/src/test/java/org/apache/hadoop/hbase/regionserver/PhoenixIndexRpcSchedulerFactoryTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/hadoop/hbase/regionserver/PhoenixIndexRpcSchedulerFactoryTest.java b/phoenix-core/src/test/java/org/apache/hadoop/hbase/regionserver/PhoenixIndexRpcSchedulerFactoryTest.java new file mode 100644 index 0000000..0fd3d6b --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/hadoop/hbase/regionserver/PhoenixIndexRpcSchedulerFactoryTest.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import org.apache.hadoop.conf.Configuration; +import org.apache.phoenix.hbase.index.ipc.PhoenixIndexRpcSchedulerFactory; +import org.junit.Test; + +public class PhoenixIndexRpcSchedulerFactoryTest { + + @Test + public void ensureInstantiation() throws Exception { + Configuration conf = new Configuration(false); + conf.setClass(HRegionServer.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, + PhoenixIndexRpcSchedulerFactory.class, RpcSchedulerFactory.class); + // kinda lame that we copy the copy from the regionserver to do this and can't use a static + // method, but meh + try { + Class rpcSchedulerFactoryClass = + conf.getClass(HRegionServer.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, + SimpleRpcSchedulerFactory.class); + Object o = ((RpcSchedulerFactory) rpcSchedulerFactoryClass.newInstance()); + assertTrue(o instanceof PhoenixIndexRpcSchedulerFactory); + } catch (InstantiationException e) { + assertTrue("Should not have got an exception when instantiing the rpc scheduler: " + e, + false); + } catch (IllegalAccessException e) { + assertTrue("Should not have got an exception when instantiing the rpc scheduler: " + e, + false); + } + } + + /** + * Ensure that we can't configure the index priority ranges inside the hbase ranges + * @throws Exception + */ + @Test + public void testValidateIndexPriorityRanges() throws Exception { + Configuration conf = new Configuration(false); + // standard configs should be fine + PhoenixIndexRpcSchedulerFactory factory = new PhoenixIndexRpcSchedulerFactory(); + factory.create(conf, null); + + setMinMax(conf, 0, 4); + factory.create(conf, null); + + setMinMax(conf, 101, 102); + factory.create(conf, null); + + setMinMax(conf, 102, 101); + try { + factory.create(conf, null); + fail("Should not have allowed max less than min"); + } catch (IllegalArgumentException e) { + // expected + } + + setMinMax(conf, 5, 6); + try { + factory.create(conf, null); + fail("Should not have allowed min in range"); + } catch (IllegalArgumentException e) { + // expected + } + + setMinMax(conf, 6, 60); + try { + factory.create(conf, null); + fail("Should not have allowed min/max in hbase range"); + } catch (IllegalArgumentException e) { + // expected + } + + setMinMax(conf, 6, 101); + try { + factory.create(conf, null); + fail("Should not have allowed in range"); + } catch (IllegalArgumentException e) { + // expected + } + } + + private void setMinMax(Configuration conf, int min, int max) { + conf.setInt(PhoenixIndexRpcSchedulerFactory.MIN_INDEX_PRIOIRTY_KEY, min); + conf.setInt(PhoenixIndexRpcSchedulerFactory.MAX_INDEX_PRIOIRTY_KEY, max); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/1954c717/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index d38b2d6..1347c45 100644 --- a/pom.xml +++ b/pom.xml @@ -67,8 +67,8 @@ true - 0.98.1-hadoop1 - 0.98.1-hadoop2 + 0.98.4-hadoop1 + 0.98.4-hadoop2 1.0.4 2.2.0