phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jamestay...@apache.org
Subject [2/4] phoenix git commit: PHOENIX-1413 Add Phoenix coprocessors with configurable priority
Date Fri, 07 Nov 2014 03:45:42 GMT
PHOENIX-1413 Add Phoenix coprocessors with configurable priority


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b46d3e23
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b46d3e23
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b46d3e23

Branch: refs/heads/4.0
Commit: b46d3e23f7fdd357a5d8845559f700a68a879c16
Parents: 798531c
Author: James Taylor <jtaylor@salesforce.com>
Authored: Thu Nov 6 18:24:54 2014 -0800
Committer: James Taylor <jtaylor@salesforce.com>
Committed: Thu Nov 6 18:24:54 2014 -0800

----------------------------------------------------------------------
 .../EndToEndCoveredColumnsIndexBuilderIT.java   |  3 ++-
 .../org/apache/phoenix/hbase/index/Indexer.java | 10 ++++-----
 .../CoveredColumnIndexSpecifierBuilder.java     |  4 ++--
 .../query/ConnectionQueryServicesImpl.java      | 23 ++++++++++----------
 .../org/apache/phoenix/query/QueryServices.java |  2 ++
 .../phoenix/query/QueryServicesOptions.java     |  5 +++++
 6 files changed, 28 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/b46d3e23/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
index d93b011..9aae820 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
@@ -33,6 +33,7 @@ import java.util.Queue;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.Coprocessor;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
@@ -306,7 +307,7 @@ public class EndToEndCoveredColumnsIndexBuilderIT {
     // initializer blows up.
     indexerOpts.put(CoveredColumnsIndexBuilder.CODEC_CLASS_NAME_KEY,
       CoveredIndexCodecForTesting.class.getName());
-    Indexer.enableIndexing(desc, CoveredColumnsIndexBuilder.class, indexerOpts);
+    Indexer.enableIndexing(desc, CoveredColumnsIndexBuilder.class, indexerOpts, Coprocessor.PRIORITY_USER);
 
     // create the table
     HBaseAdmin admin = UTIL.getHBaseAdmin();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b46d3e23/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
index 9c48a8d..b841410 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
@@ -31,7 +31,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.Coprocessor;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
@@ -607,18 +606,19 @@ public class Indexer extends BaseRegionObserver {
   /**
    * Enable indexing on the given table
    * @param desc {@link HTableDescriptor} for the table on which indexing should be enabled
-   * @param builder class to use when building the index for this table
-   * @param properties map of custom configuration options to make available to your
+ * @param builder class to use when building the index for this table
+ * @param properties map of custom configuration options to make available to your
    *          {@link IndexBuilder} on the server-side
+ * @param priority TODO
    * @throws IOException the Indexer coprocessor cannot be added
    */
   public static void enableIndexing(HTableDescriptor desc, Class<? extends IndexBuilder>
builder,
-      Map<String, String> properties) throws IOException {
+      Map<String, String> properties, int priority) throws IOException {
     if (properties == null) {
       properties = new HashMap<String, String>();
     }
     properties.put(Indexer.INDEX_BUILDER_CONF_KEY, builder.getName());
-    desc.addCoprocessor(Indexer.class.getName(), null, Coprocessor.PRIORITY_USER, properties);
+    desc.addCoprocessor(Indexer.class.getName(), null, priority, properties);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b46d3e23/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexSpecifierBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexSpecifierBuilder.java
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexSpecifierBuilder.java
index 9fcd5f3..6ac89d1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexSpecifierBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexSpecifierBuilder.java
@@ -26,8 +26,8 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Coprocessor;
 import org.apache.hadoop.hbase.HTableDescriptor;
-
 import org.apache.phoenix.hbase.index.Indexer;
 import org.apache.phoenix.hbase.index.covered.CoveredColumnsIndexBuilder;
 import org.apache.phoenix.hbase.index.covered.IndexCodec;
@@ -137,7 +137,7 @@ public class CoveredColumnIndexSpecifierBuilder {
     // add the codec for the index to the map of options
     Map<String, String> opts = this.convertToMap();
     opts.put(CoveredColumnsIndexBuilder.CODEC_CLASS_NAME_KEY, clazz.getName());
-    Indexer.enableIndexing(desc, CoveredColumnIndexer.class, opts);
+    Indexer.enableIndexing(desc, CoveredColumnIndexer.class, opts, Coprocessor.PRIORITY_USER);
   }
 
   static List<ColumnGroup> getColumns(Configuration conf) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b46d3e23/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 4aa1bac..b73e2dc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -604,18 +604,19 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices
implement
 
     private void addCoprocessors(byte[] tableName, HTableDescriptor descriptor, PTableType
tableType) throws SQLException {
         // The phoenix jar must be available on HBase classpath
+        int priority = props.getInt(QueryServices.COPROCESSOR_PRIORITY_ATTRIB, QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY);
         try {
             if (!descriptor.hasCoprocessor(ScanRegionObserver.class.getName())) {
-                descriptor.addCoprocessor(ScanRegionObserver.class.getName(), null, 1, null);
+                descriptor.addCoprocessor(ScanRegionObserver.class.getName(), null, priority,
null);
             }
             if (!descriptor.hasCoprocessor(UngroupedAggregateRegionObserver.class.getName()))
{
-                descriptor.addCoprocessor(UngroupedAggregateRegionObserver.class.getName(),
null, 1, null);
+                descriptor.addCoprocessor(UngroupedAggregateRegionObserver.class.getName(),
null, priority, null);
             }
             if (!descriptor.hasCoprocessor(GroupedAggregateRegionObserver.class.getName()))
{
-                descriptor.addCoprocessor(GroupedAggregateRegionObserver.class.getName(),
null, 1, null);
+                descriptor.addCoprocessor(GroupedAggregateRegionObserver.class.getName(),
null, priority, null);
             }
             if (!descriptor.hasCoprocessor(ServerCachingEndpointImpl.class.getName())) {
-                descriptor.addCoprocessor(ServerCachingEndpointImpl.class.getName(), null,
1, null);
+                descriptor.addCoprocessor(ServerCachingEndpointImpl.class.getName(), null,
priority, null);
             }
             // TODO: better encapsulation for this
             // Since indexes can't have indexes, don't install our indexing coprocessor for
indexes.
@@ -627,11 +628,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices
implement
                     && !descriptor.hasCoprocessor(Indexer.class.getName())) {
                 Map<String, String> opts = Maps.newHashMapWithExpectedSize(1);
                 opts.put(CoveredColumnsIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName());
-                Indexer.enableIndexing(descriptor, PhoenixIndexBuilder.class, opts);
+                Indexer.enableIndexing(descriptor, PhoenixIndexBuilder.class, opts, priority);
             }
             if (SchemaUtil.isStatsTable(tableName) && !descriptor.hasCoprocessor(MultiRowMutationEndpoint.class.getName()))
{
                 descriptor.addCoprocessor(MultiRowMutationEndpoint.class.getName(),
-                        null, 1, null);
+                        null, priority, null);
             }
             
             if (descriptor.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES) != null
@@ -639,13 +640,13 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices
implement
                             .getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES)))) {
                 if (!descriptor.hasCoprocessor(IndexHalfStoreFileReaderGenerator.class.getName()))
{
                     descriptor.addCoprocessor(IndexHalfStoreFileReaderGenerator.class.getName(),
-                        null, 1, null);
+                        null, priority, null);
                 }
             } else {
                 if (!descriptor.hasCoprocessor(LocalIndexSplitter.class.getName())
                         && !SchemaUtil.isMetaTable(tableName)
                         && !SchemaUtil.isSequenceTable(tableName)) {
-                    descriptor.addCoprocessor(LocalIndexSplitter.class.getName(), null, 1,
null);
+                    descriptor.addCoprocessor(LocalIndexSplitter.class.getName(), null, priority,
null);
                 }
             }
 
@@ -653,14 +654,14 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices
implement
             // stay on the same region.
             if (SchemaUtil.isMetaTable(tableName)) {
                 if (!descriptor.hasCoprocessor(MetaDataEndpointImpl.class.getName())) {
-                    descriptor.addCoprocessor(MetaDataEndpointImpl.class.getName(), null,
1, null);
+                    descriptor.addCoprocessor(MetaDataEndpointImpl.class.getName(), null,
priority, null);
                 }
                 if (!descriptor.hasCoprocessor(MetaDataRegionObserver.class.getName())) {
-                    descriptor.addCoprocessor(MetaDataRegionObserver.class.getName(), null,
2, null);
+                    descriptor.addCoprocessor(MetaDataRegionObserver.class.getName(), null,
priority + 1, null);
                 }
             } else if (SchemaUtil.isSequenceTable(tableName)) {
                 if (!descriptor.hasCoprocessor(SequenceRegionObserver.class.getName())) {
-                    descriptor.addCoprocessor(SequenceRegionObserver.class.getName(), null,
1, null);
+                    descriptor.addCoprocessor(SequenceRegionObserver.class.getName(), null,
priority, null);
                 }
             }
         } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b46d3e23/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 72002ae..414ed57 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -139,6 +139,8 @@ public interface QueryServices extends SQLCloseable {
     public static final String STATS_USE_CURRENT_TIME_ATTRIB = "phoenix.stats.useCurrentTime";
 
     public static final String SEQUENCE_SALT_BUCKETS_ATTRIB = "phoenix.sequence.saltBuckets";
+    public static final String COPROCESSOR_PRIORITY_ATTRIB = "phoenix.coprocessor.priority";
+    
     /**
      * Get executor service used for parallel scans
      */

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b46d3e23/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 7c8ecd4..8491783 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -60,6 +60,7 @@ import static org.apache.phoenix.query.QueryServices.USE_INDEXES_ATTRIB;
 import java.util.Map.Entry;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Coprocessor;
 import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
 import org.apache.phoenix.schema.SaltingUtil;
 import org.apache.phoenix.trace.util.Tracing;
@@ -156,6 +157,10 @@ public class QueryServicesOptions {
      * Use only first time SYSTEM.SEQUENCE table is created.
      */
     public static final int DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS = SaltingUtil.MAX_BUCKET_NUM;
+    /**
+     * Default value for coprocessor priority is between SYSTEM and USER priority.
+     */
+    public static final int DEFAULT_COPROCESSOR_PRIORITY = Coprocessor.PRIORITY_SYSTEM/2
+ Coprocessor.PRIORITY_USER/2; // Divide individually to prevent any overflow
 
     private final Configuration config;
 


Mime
View raw message