phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vincentp...@apache.org
Subject phoenix git commit: PHOENIX-4683 Cap timeouts for stats precompact hook logic
Date Mon, 09 Apr 2018 21:06:19 GMT
Repository: phoenix
Updated Branches:
  refs/heads/master 0b1b219ef -> 28c11fe3f


PHOENIX-4683 Cap timeouts for stats precompact hook logic


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/28c11fe3
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/28c11fe3
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/28c11fe3

Branch: refs/heads/master
Commit: 28c11fe3f4647f192849adb9c2567cff9c405bbb
Parents: 0b1b219
Author: Vincent Poon <vincentpoon@apache.org>
Authored: Mon Apr 9 14:04:28 2018 -0700
Committer: Vincent Poon <vincentpoon@apache.org>
Committed: Mon Apr 9 14:06:15 2018 -0700

----------------------------------------------------------------------
 .../DelegateRegionCoprocessorEnvironment.java   |  9 ++-
 .../UngroupedAggregateRegionObserver.java       |  8 ++-
 .../hbase/index/write/IndexWriterUtils.java     | 71 ++-----------------
 .../org/apache/phoenix/util/ServerUtil.java     | 72 ++++++++++++++++++++
 4 files changed, 89 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/28c11fe3/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionCoprocessorEnvironment.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionCoprocessorEnvironment.java
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionCoprocessorEnvironment.java
index a173251..a887632 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionCoprocessorEnvironment.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionCoprocessorEnvironment.java
@@ -30,6 +30,9 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.metrics.MetricRegistry;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.phoenix.hbase.index.table.HTableFactory;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.util.ServerUtil;
 
 /**
  * Class to encapsulate {@link RegionCoprocessorEnvironment} for phoenix coprocessors. Often
we
@@ -40,10 +43,12 @@ public class DelegateRegionCoprocessorEnvironment implements RegionCoprocessorEn
 
     private final Configuration config;
     private RegionCoprocessorEnvironment delegate;
+    private HTableFactory tableFactory;
 
     public DelegateRegionCoprocessorEnvironment(Configuration config, RegionCoprocessorEnvironment
delegate) {
         this.config = config;
         this.delegate = delegate;
+        this.tableFactory = ServerUtil.getDelegateHTableFactory(this, config);
     }
 
     @Override
@@ -78,13 +83,13 @@ public class DelegateRegionCoprocessorEnvironment implements RegionCoprocessorEn
 
     @Override
     public HTableInterface getTable(TableName tableName) throws IOException {
-        return delegate.getTable(tableName);
+        return tableFactory.getTable(new ImmutableBytesPtr(tableName.getName()));
     }
 
     @Override
     public HTableInterface getTable(TableName tableName, ExecutorService service)
             throws IOException {
-        return delegate.getTable(tableName, service);
+        return tableFactory.getTable(new ImmutableBytesPtr(tableName.getName()));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/28c11fe3/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 965ba1b..27d3880 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -94,6 +94,7 @@ import org.apache.phoenix.hbase.index.exception.IndexWriteException;
 import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
+import org.apache.phoenix.hbase.index.write.IndexWriterUtils;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.index.PhoenixIndexFailurePolicy;
@@ -978,10 +979,13 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                     InternalScanner internalScanner = scanner;
                     try {
                         long clientTimeStamp = EnvironmentEdgeManager.currentTimeMillis();
+                        DelegateRegionCoprocessorEnvironment compactionConfEnv = new DelegateRegionCoprocessorEnvironment(compactionConfig,
c.getEnvironment());
                         StatisticsCollector stats = StatisticsCollectorFactory.createStatisticsCollector(
-                            c.getEnvironment(), table.getNameAsString(), clientTimeStamp,
+                            compactionConfEnv, table.getNameAsString(), clientTimeStamp,
                             store.getFamily().getName());
-                        internalScanner = stats.createCompactionScanner(c.getEnvironment(),
store, scanner);
+                        internalScanner =
+                                stats.createCompactionScanner(compactionConfEnv,
+                                    store, scanner);
                     } catch (Exception e) {
                         // If we can't reach the stats table, don't interrupt the normal
                         // compaction operation, just log a warning.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/28c11fe3/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java
index 29b9faf..76d6800 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java
@@ -17,25 +17,14 @@
  */
 package org.apache.phoenix.hbase.index.write;
 
-import java.io.IOException;
-import java.util.concurrent.ExecutorService;
-
-import javax.annotation.concurrent.GuardedBy;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
-import org.apache.hadoop.hbase.client.CoprocessorHConnection;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.RegionServerServices;
-import org.apache.phoenix.hbase.index.table.CoprocessorHTableFactory;
 import org.apache.phoenix.hbase.index.table.HTableFactory;
-import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ServerUtil;
 
 public class IndexWriterUtils {
 
@@ -86,66 +75,14 @@ public class IndexWriterUtils {
 
     public static HTableFactory getDefaultDelegateHTableFactory(CoprocessorEnvironment env)
{
         // create a simple delegate factory, setup the way we need
-        Configuration conf = env.getConfiguration();
+        Configuration conf = PropertiesUtil.cloneConfig(env.getConfiguration());
         // set the number of threads allowed per table.
         int htableThreads =
                 conf.getInt(IndexWriterUtils.INDEX_WRITER_PER_TABLE_THREADS_CONF_KEY,
                     IndexWriterUtils.DEFAULT_NUM_PER_TABLE_THREADS);
         LOG.trace("Creating HTableFactory with " + htableThreads + " threads for each HTable.");
         IndexManagementUtil.setIfNotSet(conf, HTABLE_THREAD_KEY, htableThreads);
-        if (env instanceof RegionCoprocessorEnvironment) {
-            RegionCoprocessorEnvironment e = (RegionCoprocessorEnvironment) env;
-            RegionServerServices services = e.getRegionServerServices();
-            if (services instanceof HRegionServer) {
-                return new CoprocessorHConnectionTableFactory(conf, (HRegionServer) services);
-            }
-        }
-        return new CoprocessorHTableFactory(env);
+        return ServerUtil.getDelegateHTableFactory(env, conf);
     }
 
-    /**
-     * {@code HTableFactory} that creates HTables by using a {@link CoprocessorHConnection}
This
-     * factory was added as a workaround to the bug reported in
-     * https://issues.apache.org/jira/browse/HBASE-18359
-     */
-    private static class CoprocessorHConnectionTableFactory implements HTableFactory {
-        @GuardedBy("CoprocessorHConnectionTableFactory.this")
-        private HConnection connection;
-        private final Configuration conf;
-        private final HRegionServer server;
-
-        CoprocessorHConnectionTableFactory(Configuration conf, HRegionServer server) {
-            this.conf = conf;
-            this.server = server;
-        }
-
-        private synchronized HConnection getConnection(Configuration conf) throws IOException
{
-            if (connection == null || connection.isClosed()) {
-                connection = new CoprocessorHConnection(conf, server);
-            }
-            return connection;
-        }
-
-        @Override
-        public HTableInterface getTable(ImmutableBytesPtr tablename) throws IOException {
-            return getConnection(conf).getTable(tablename.copyBytesIfNecessary());
-        }
-
-        @Override
-        public synchronized void shutdown() {
-            try {
-                if (connection != null && !connection.isClosed()) {
-                    connection.close();
-                }
-            } catch (Throwable e) {
-                LOG.warn("Error while trying to close the HConnection used by CoprocessorHConnectionTableFactory",
e);
-            }
-        }
-
-        @Override
-        public HTableInterface getTable(ImmutableBytesPtr tablename, ExecutorService pool)
-                throws IOException {
-            return getConnection(conf).getTable(tablename.copyBytesIfNecessary(), pool);
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/28c11fe3/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
index bc2b625..4b3cc43 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
@@ -21,29 +21,44 @@ import java.io.IOException;
 import java.sql.SQLException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import javax.annotation.concurrent.GuardedBy;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.CoprocessorHConnection;
+import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.HTablePool;
 import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.coprocessor.HashJoinCacheNotFoundException;
 import org.apache.phoenix.exception.PhoenixIOException;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.hbase.index.table.CoprocessorHTableFactory;
+import org.apache.phoenix.hbase.index.table.HTableFactory;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.VersionUtil;
 import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
 
 
 @SuppressWarnings("deprecation")
 public class ServerUtil {
+    private static final Log LOG = LogFactory.getLog(ServerUtil.class);
     private static final int COPROCESSOR_SCAN_WORKS = VersionUtil.encodeVersion("0.98.6");
     
     private static final String FORMAT = "ERROR %d (%s): %s";
@@ -254,4 +269,61 @@ public class ServerUtil {
                     endKey) < 0));
     }
 
+    public static HTableFactory getDelegateHTableFactory(CoprocessorEnvironment env, Configuration
conf) {
+        if (env instanceof RegionCoprocessorEnvironment) {
+            RegionCoprocessorEnvironment e = (RegionCoprocessorEnvironment) env;
+            RegionServerServices services = e.getRegionServerServices();
+            if (services instanceof HRegionServer) {
+                return new CoprocessorHConnectionTableFactory(conf, (HRegionServer) services);
+            }
+        }
+        return new CoprocessorHTableFactory(env);
+    }
+
+    /**
+     * {@code HTableFactory} that creates HTables by using a {@link CoprocessorHConnection}
This
+     * factory was added as a workaround to the bug reported in
+     * https://issues.apache.org/jira/browse/HBASE-18359
+     */
+    public static class CoprocessorHConnectionTableFactory implements HTableFactory {
+        @GuardedBy("CoprocessorHConnectionTableFactory.this")
+        private HConnection connection;
+        private final Configuration conf;
+        private final HRegionServer server;
+
+        CoprocessorHConnectionTableFactory(Configuration conf, HRegionServer server) {
+            this.conf = conf;
+            this.server = server;
+        }
+
+        private synchronized HConnection getConnection(Configuration conf) throws IOException
{
+            if (connection == null || connection.isClosed()) {
+                connection = new CoprocessorHConnection(conf, server);
+            }
+            return connection;
+        }
+
+        @Override
+        public HTableInterface getTable(ImmutableBytesPtr tablename) throws IOException {
+            return getConnection(conf).getTable(tablename.copyBytesIfNecessary());
+        }
+
+        @Override
+        public synchronized void shutdown() {
+            try {
+                if (connection != null && !connection.isClosed()) {
+                    connection.close();
+                }
+            } catch (Throwable e) {
+                LOG.warn("Error while trying to close the HConnection used by CoprocessorHConnectionTableFactory",
e);
+            }
+        }
+
+        @Override
+        public HTableInterface getTable(ImmutableBytesPtr tablename, ExecutorService pool)
+                throws IOException {
+            return getConnection(conf).getTable(tablename.copyBytesIfNecessary(), pool);
+        }
+    }
+
 }


Mime
View raw message