From commits-return-71741-archive-asf-public=cust-asf.ponee.io@hbase.apache.org Tue Apr 24 16:48:32 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id C32EB180679 for ; Tue, 24 Apr 2018 16:48:29 +0200 (CEST) Received: (qmail 78469 invoked by uid 500); 24 Apr 2018 14:48:28 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 78452 invoked by uid 99); 24 Apr 2018 14:48:28 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 24 Apr 2018 14:48:28 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4C28FE96A8; Tue, 24 Apr 2018 14:48:28 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Tue, 24 Apr 2018 14:48:29 -0000 Message-Id: <6799b05614744aa1a090d2722318da95@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [02/10] hbase-site git commit: Published site at a8be3bb814378805a1b5eae85826c9eff3768534. http://git-wip-us.apache.org/repos/asf/hbase-site/blob/c81ffb38/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.html b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.html index 6950867..a338ca1 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.html @@ -44,1728 +44,1738 @@ 036import org.apache.hadoop.hbase.Cell; 037import org.apache.hadoop.hbase.CompareOperator; 038import org.apache.hadoop.hbase.HBaseConfiguration; -039import org.apache.hadoop.hbase.RawCellBuilder; -040import org.apache.hadoop.hbase.RawCellBuilderFactory; -041import org.apache.hadoop.hbase.ServerName; -042import org.apache.hadoop.hbase.SharedConnection; -043import org.apache.hadoop.hbase.client.Append; -044import org.apache.hadoop.hbase.client.Connection; -045import org.apache.hadoop.hbase.client.Delete; -046import org.apache.hadoop.hbase.client.Durability; -047import org.apache.hadoop.hbase.client.Get; -048import org.apache.hadoop.hbase.client.Increment; -049import org.apache.hadoop.hbase.client.Mutation; -050import org.apache.hadoop.hbase.client.Put; -051import org.apache.hadoop.hbase.client.RegionInfo; -052import org.apache.hadoop.hbase.client.Result; -053import org.apache.hadoop.hbase.client.Scan; -054import org.apache.hadoop.hbase.client.TableDescriptor; -055import org.apache.hadoop.hbase.coprocessor.BaseEnvironment; -056import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver; -057import org.apache.hadoop.hbase.coprocessor.CoprocessorException; -058import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; -059import org.apache.hadoop.hbase.coprocessor.CoprocessorService; -060import org.apache.hadoop.hbase.coprocessor.CoprocessorServiceBackwardCompatiblity; -061import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor; -062import org.apache.hadoop.hbase.coprocessor.EndpointObserver; -063import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices; -064import org.apache.hadoop.hbase.coprocessor.MetricsCoprocessor; -065import org.apache.hadoop.hbase.coprocessor.ObserverContext; -066import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; -067import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -068import org.apache.hadoop.hbase.coprocessor.RegionObserver; -069import org.apache.hadoop.hbase.coprocessor.RegionObserver.MutationType; -070import org.apache.hadoop.hbase.filter.ByteArrayComparable; -071import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; -072import org.apache.hadoop.hbase.io.Reference; -073import org.apache.hadoop.hbase.io.hfile.CacheConfig; -074import org.apache.hadoop.hbase.metrics.MetricRegistry; -075import org.apache.hadoop.hbase.regionserver.Region.Operation; -076import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; -077import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; -078import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker; -079import org.apache.hadoop.hbase.security.User; -080import org.apache.hadoop.hbase.util.CoprocessorClassLoader; -081import org.apache.hadoop.hbase.util.Pair; -082import org.apache.hadoop.hbase.wal.WALEdit; -083import org.apache.hadoop.hbase.wal.WALKey; -084import org.apache.yetus.audience.InterfaceAudience; -085import org.slf4j.Logger; -086import org.slf4j.LoggerFactory; -087 -088import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.AbstractReferenceMap; -089import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.ReferenceMap; -090 -091/** -092 * Implements the coprocessor environment and runtime support for coprocessors -093 * loaded within a {@link Region}. -094 */ -095@InterfaceAudience.Private -096public class RegionCoprocessorHost -097 extends CoprocessorHost<RegionCoprocessor, RegionCoprocessorEnvironment> { -098 -099 private static final Logger LOG = LoggerFactory.getLogger(RegionCoprocessorHost.class); -100 // The shared data map -101 private static final ReferenceMap<String, ConcurrentMap<String, Object>> SHARED_DATA_MAP = -102 new ReferenceMap<>(AbstractReferenceMap.ReferenceStrength.HARD, -103 AbstractReferenceMap.ReferenceStrength.WEAK); -104 -105 // optimization: no need to call postScannerFilterRow, if no coprocessor implements it -106 private final boolean hasCustomPostScannerFilterRow; -107 -108 /** -109 * -110 * Encapsulation of the environment of each coprocessor -111 */ -112 private static class RegionEnvironment extends BaseEnvironment<RegionCoprocessor> -113 implements RegionCoprocessorEnvironment { -114 private Region region; -115 ConcurrentMap<String, Object> sharedData; -116 private final MetricRegistry metricRegistry; -117 private final RegionServerServices services; -118 -119 /** -120 * Constructor -121 * @param impl the coprocessor instance -122 * @param priority chaining priority -123 */ -124 public RegionEnvironment(final RegionCoprocessor impl, final int priority, -125 final int seq, final Configuration conf, final Region region, -126 final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) { -127 super(impl, priority, seq, conf); -128 this.region = region; -129 this.sharedData = sharedData; -130 this.services = services; -131 this.metricRegistry = -132 MetricsCoprocessor.createRegistryForRegionCoprocessor(impl.getClass().getName()); -133 } -134 -135 /** @return the region */ -136 @Override -137 public Region getRegion() { -138 return region; -139 } -140 -141 @Override -142 public OnlineRegions getOnlineRegions() { -143 return this.services; -144 } -145 -146 @Override -147 public Connection getConnection() { -148 // Mocks may have services as null at test time. -149 return services != null ? new SharedConnection(services.getConnection()) : null; -150 } -151 -152 @Override -153 public Connection createConnection(Configuration conf) throws IOException { -154 return services != null ? this.services.createConnection(conf) : null; -155 } -156 -157 @Override -158 public ServerName getServerName() { -159 return services != null? services.getServerName(): null; -160 } -161 -162 @Override -163 public void shutdown() { -164 super.shutdown(); -165 MetricsCoprocessor.removeRegistry(this.metricRegistry); -166 } -167 -168 @Override -169 public ConcurrentMap<String, Object> getSharedData() { -170 return sharedData; -171 } -172 -173 @Override -174 public RegionInfo getRegionInfo() { -175 return region.getRegionInfo(); -176 } -177 -178 @Override -179 public MetricRegistry getMetricRegistryForRegionServer() { -180 return metricRegistry; -181 } -182 -183 @Override -184 public RawCellBuilder getCellBuilder() { -185 // We always do a DEEP_COPY only -186 return RawCellBuilderFactory.create(); -187 } -188 } -189 -190 /** -191 * Special version of RegionEnvironment that exposes RegionServerServices for Core -192 * Coprocessors only. Temporary hack until Core Coprocessors are integrated into Core. -193 */ -194 private static class RegionEnvironmentForCoreCoprocessors extends -195 RegionEnvironment implements HasRegionServerServices { -196 private final RegionServerServices rsServices; -197 -198 public RegionEnvironmentForCoreCoprocessors(final RegionCoprocessor impl, final int priority, -199 final int seq, final Configuration conf, final Region region, -200 final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) { -201 super(impl, priority, seq, conf, region, services, sharedData); -202 this.rsServices = services; -203 } -204 -205 /** -206 * @return An instance of RegionServerServices, an object NOT for general user-space Coprocessor -207 * consumption. -208 */ -209 @Override -210 public RegionServerServices getRegionServerServices() { -211 return this.rsServices; -212 } -213 } -214 -215 static class TableCoprocessorAttribute { -216 private Path path; -217 private String className; -218 private int priority; -219 private Configuration conf; -220 -221 public TableCoprocessorAttribute(Path path, String className, int priority, -222 Configuration conf) { -223 this.path = path; -224 this.className = className; -225 this.priority = priority; -226 this.conf = conf; -227 } -228 -229 public Path getPath() { -230 return path; -231 } -232 -233 public String getClassName() { -234 return className; -235 } -236 -237 public int getPriority() { -238 return priority; -239 } -240 -241 public Configuration getConf() { -242 return conf; -243 } -244 } -245 -246 /** The region server services */ -247 RegionServerServices rsServices; -248 /** The region */ -249 HRegion region; -250 -251 /** -252 * Constructor -253 * @param region the region -254 * @param rsServices interface to available region server functionality -255 * @param conf the configuration -256 */ -257 public RegionCoprocessorHost(final HRegion region, -258 final RegionServerServices rsServices, final Configuration conf) { -259 super(rsServices); -260 this.conf = conf; -261 this.rsServices = rsServices; -262 this.region = region; -263 this.pathPrefix = Integer.toString(this.region.getRegionInfo().hashCode()); -264 -265 // load system default cp's from configuration. -266 loadSystemCoprocessors(conf, REGION_COPROCESSOR_CONF_KEY); -267 -268 // load system default cp's for user tables from configuration. -269 if (!region.getRegionInfo().getTable().isSystemTable()) { -270 loadSystemCoprocessors(conf, USER_REGION_COPROCESSOR_CONF_KEY); -271 } -272 -273 // load Coprocessor From HDFS -274 loadTableCoprocessors(conf); -275 -276 // now check whether any coprocessor implements postScannerFilterRow -277 boolean hasCustomPostScannerFilterRow = false; -278 out: for (RegionCoprocessorEnvironment env: coprocEnvironments) { -279 if (env.getInstance() instanceof RegionObserver) { -280 Class<?> clazz = env.getInstance().getClass(); -281 for(;;) { -282 if (clazz == null) { -283 // we must have directly implemented RegionObserver -284 hasCustomPostScannerFilterRow = true; -285 break out; -286 } -287 try { -288 clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class, -289 InternalScanner.class, Cell.class, boolean.class); -290 // this coprocessor has a custom version of postScannerFilterRow -291 hasCustomPostScannerFilterRow = true; -292 break out; -293 } catch (NoSuchMethodException ignore) { -294 } -295 // the deprecated signature still exists -296 try { -297 clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class, -298 InternalScanner.class, byte[].class, int.class, short.class, boolean.class); -299 // this coprocessor has a custom version of postScannerFilterRow -300 hasCustomPostScannerFilterRow = true; -301 break out; -302 } catch (NoSuchMethodException ignore) { -303 } -304 clazz = clazz.getSuperclass(); -305 } -306 } -307 } -308 this.hasCustomPostScannerFilterRow = hasCustomPostScannerFilterRow; -309 } -310 -311 static List<TableCoprocessorAttribute> getTableCoprocessorAttrsFromSchema(Configuration conf, -312 TableDescriptor htd) { -313 return htd.getCoprocessorDescriptors().stream().map(cp -> { -314 Path path = cp.getJarPath().map(p -> new Path(p)).orElse(null); -315 Configuration ourConf; -316 if (!cp.getProperties().isEmpty()) { -317 // do an explicit deep copy of the passed configuration -318 ourConf = new Configuration(false); -319 HBaseConfiguration.merge(ourConf, conf); -320 cp.getProperties().forEach((k, v) -> ourConf.set(k, v)); -321 } else { -322 ourConf = conf; -323 } -324 return new TableCoprocessorAttribute(path, cp.getClassName(), cp.getPriority(), ourConf); -325 }).collect(Collectors.toList()); -326 } -327 -328 /** -329 * Sanity check the table coprocessor attributes of the supplied schema. Will -330 * throw an exception if there is a problem. -331 * @param conf -332 * @param htd -333 * @throws IOException -334 */ -335 public static void testTableCoprocessorAttrs(final Configuration conf, -336 final TableDescriptor htd) throws IOException { -337 String pathPrefix = UUID.randomUUID().toString(); -338 for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf, htd)) { -339 if (attr.getPriority() < 0) { -340 throw new IOException("Priority for coprocessor " + attr.getClassName() + -341 " cannot be less than 0"); -342 } -343 ClassLoader old = Thread.currentThread().getContextClassLoader(); -344 try { -345 ClassLoader cl; -346 if (attr.getPath() != null) { -347 cl = CoprocessorClassLoader.getClassLoader(attr.getPath(), -348 CoprocessorHost.class.getClassLoader(), pathPrefix, conf); -349 } else { -350 cl = CoprocessorHost.class.getClassLoader(); -351 } -352 Thread.currentThread().setContextClassLoader(cl); -353 cl.loadClass(attr.getClassName()); -354 } catch (ClassNotFoundException e) { -355 throw new IOException("Class " + attr.getClassName() + " cannot be loaded", e); -356 } finally { -357 Thread.currentThread().setContextClassLoader(old); -358 } -359 } -360 } -361 -362 void loadTableCoprocessors(final Configuration conf) { -363 boolean coprocessorsEnabled = conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY, -364 DEFAULT_COPROCESSORS_ENABLED); -365 boolean tableCoprocessorsEnabled = conf.getBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY, -366 DEFAULT_USER_COPROCESSORS_ENABLED); -367 if (!(coprocessorsEnabled && tableCoprocessorsEnabled)) { -368 return; +039import org.apache.hadoop.hbase.HConstants; +040import org.apache.hadoop.hbase.RawCellBuilder; +041import org.apache.hadoop.hbase.RawCellBuilderFactory; +042import org.apache.hadoop.hbase.ServerName; +043import org.apache.hadoop.hbase.SharedConnection; +044import org.apache.hadoop.hbase.client.Append; +045import org.apache.hadoop.hbase.client.Connection; +046import org.apache.hadoop.hbase.client.Delete; +047import org.apache.hadoop.hbase.client.Durability; +048import org.apache.hadoop.hbase.client.Get; +049import org.apache.hadoop.hbase.client.Increment; +050import org.apache.hadoop.hbase.client.Mutation; +051import org.apache.hadoop.hbase.client.Put; +052import org.apache.hadoop.hbase.client.RegionInfo; +053import org.apache.hadoop.hbase.client.Result; +054import org.apache.hadoop.hbase.client.Scan; +055import org.apache.hadoop.hbase.client.TableDescriptor; +056import org.apache.hadoop.hbase.coprocessor.BaseEnvironment; +057import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver; +058import org.apache.hadoop.hbase.coprocessor.CoprocessorException; +059import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +060import org.apache.hadoop.hbase.coprocessor.CoprocessorService; +061import org.apache.hadoop.hbase.coprocessor.CoprocessorServiceBackwardCompatiblity; +062import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor; +063import org.apache.hadoop.hbase.coprocessor.EndpointObserver; +064import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices; +065import org.apache.hadoop.hbase.coprocessor.MetricsCoprocessor; +066import org.apache.hadoop.hbase.coprocessor.ObserverContext; +067import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; +068import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +069import org.apache.hadoop.hbase.coprocessor.RegionObserver; +070import org.apache.hadoop.hbase.coprocessor.RegionObserver.MutationType; +071import org.apache.hadoop.hbase.filter.ByteArrayComparable; +072import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; +073import org.apache.hadoop.hbase.io.Reference; +074import org.apache.hadoop.hbase.io.hfile.CacheConfig; +075import org.apache.hadoop.hbase.metrics.MetricRegistry; +076import org.apache.hadoop.hbase.regionserver.Region.Operation; +077import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; +078import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +079import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker; +080import org.apache.hadoop.hbase.security.User; +081import org.apache.hadoop.hbase.util.CoprocessorClassLoader; +082import org.apache.hadoop.hbase.util.Pair; +083import org.apache.hadoop.hbase.wal.WALEdit; +084import org.apache.hadoop.hbase.wal.WALKey; +085import org.apache.yetus.audience.InterfaceAudience; +086import org.slf4j.Logger; +087import org.slf4j.LoggerFactory; +088 +089import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.AbstractReferenceMap; +090import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.ReferenceMap; +091 +092/** +093 * Implements the coprocessor environment and runtime support for coprocessors +094 * loaded within a {@link Region}. +095 */ +096@InterfaceAudience.Private +097public class RegionCoprocessorHost +098 extends CoprocessorHost<RegionCoprocessor, RegionCoprocessorEnvironment> { +099 +100 private static final Logger LOG = LoggerFactory.getLogger(RegionCoprocessorHost.class); +101 // The shared data map +102 private static final ReferenceMap<String, ConcurrentMap<String, Object>> SHARED_DATA_MAP = +103 new ReferenceMap<>(AbstractReferenceMap.ReferenceStrength.HARD, +104 AbstractReferenceMap.ReferenceStrength.WEAK); +105 +106 // optimization: no need to call postScannerFilterRow, if no coprocessor implements it +107 private final boolean hasCustomPostScannerFilterRow; +108 +109 /** +110 * +111 * Encapsulation of the environment of each coprocessor +112 */ +113 private static class RegionEnvironment extends BaseEnvironment<RegionCoprocessor> +114 implements RegionCoprocessorEnvironment { +115 private Region region; +116 ConcurrentMap<String, Object> sharedData; +117 private final MetricRegistry metricRegistry; +118 private final RegionServerServices services; +119 +120 /** +121 * Constructor +122 * @param impl the coprocessor instance +123 * @param priority chaining priority +124 */ +125 public RegionEnvironment(final RegionCoprocessor impl, final int priority, +126 final int seq, final Configuration conf, final Region region, +127 final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) { +128 super(impl, priority, seq, conf); +129 this.region = region; +130 this.sharedData = sharedData; +131 this.services = services; +132 this.metricRegistry = +133 MetricsCoprocessor.createRegistryForRegionCoprocessor(impl.getClass().getName()); +134 } +135 +136 /** @return the region */ +137 @Override +138 public Region getRegion() { +139 return region; +140 } +141 +142 @Override +143 public OnlineRegions getOnlineRegions() { +144 return this.services; +145 } +146 +147 @Override +148 public Connection getConnection() { +149 // Mocks may have services as null at test time. +150 return services != null ? new SharedConnection(services.getConnection()) : null; +151 } +152 +153 @Override +154 public Connection createConnection(Configuration conf) throws IOException { +155 return services != null ? this.services.createConnection(conf) : null; +156 } +157 +158 @Override +159 public ServerName getServerName() { +160 return services != null? services.getServerName(): null; +161 } +162 +163 @Override +164 public void shutdown() { +165 super.shutdown(); +166 MetricsCoprocessor.removeRegistry(this.metricRegistry); +167 } +168 +169 @Override +170 public ConcurrentMap<String, Object> getSharedData() { +171 return sharedData; +172 } +173 +174 @Override +175 public RegionInfo getRegionInfo() { +176 return region.getRegionInfo(); +177 } +178 +179 @Override +180 public MetricRegistry getMetricRegistryForRegionServer() { +181 return metricRegistry; +182 } +183 +184 @Override +185 public RawCellBuilder getCellBuilder() { +186 // We always do a DEEP_COPY only +187 return RawCellBuilderFactory.create(); +188 } +189 } +190 +191 /** +192 * Special version of RegionEnvironment that exposes RegionServerServices for Core +193 * Coprocessors only. Temporary hack until Core Coprocessors are integrated into Core. +194 */ +195 private static class RegionEnvironmentForCoreCoprocessors extends +196 RegionEnvironment implements HasRegionServerServices { +197 private final RegionServerServices rsServices; +198 +199 public RegionEnvironmentForCoreCoprocessors(final RegionCoprocessor impl, final int priority, +200 final int seq, final Configuration conf, final Region region, +201 final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) { +202 super(impl, priority, seq, conf, region, services, sharedData); +203 this.rsServices = services; +204 } +205 +206 /** +207 * @return An instance of RegionServerServices, an object NOT for general user-space Coprocessor +208 * consumption. +209 */ +210 @Override +211 public RegionServerServices getRegionServerServices() { +212 return this.rsServices; +213 } +214 } +215 +216 static class TableCoprocessorAttribute { +217 private Path path; +218 private String className; +219 private int priority; +220 private Configuration conf; +221 +222 public TableCoprocessorAttribute(Path path, String className, int priority, +223 Configuration conf) { +224 this.path = path; +225 this.className = className; +226 this.priority = priority; +227 this.conf = conf; +228 } +229 +230 public Path getPath() { +231 return path; +232 } +233 +234 public String getClassName() { +235 return className; +236 } +237 +238 public int getPriority() { +239 return priority; +240 } +241 +242 public Configuration getConf() { +243 return conf; +244 } +245 } +246 +247 /** The region server services */ +248 RegionServerServices rsServices; +249 /** The region */ +250 HRegion region; +251 +252 /** +253 * Constructor +254 * @param region the region +255 * @param rsServices interface to available region server functionality +256 * @param conf the configuration +257 */ +258 public RegionCoprocessorHost(final HRegion region, +259 final RegionServerServices rsServices, final Configuration conf) { +260 super(rsServices); +261 this.conf = conf; +262 this.rsServices = rsServices; +263 this.region = region; +264 this.pathPrefix = Integer.toString(this.region.getRegionInfo().hashCode()); +265 +266 // load system default cp's from configuration. +267 loadSystemCoprocessors(conf, REGION_COPROCESSOR_CONF_KEY); +268 +269 // load system default cp's for user tables from configuration. +270 if (!region.getRegionInfo().getTable().isSystemTable()) { +271 loadSystemCoprocessors(conf, USER_REGION_COPROCESSOR_CONF_KEY); +272 } +273 +274 // load Coprocessor From HDFS +275 loadTableCoprocessors(conf); +276 +277 // now check whether any coprocessor implements postScannerFilterRow +278 boolean hasCustomPostScannerFilterRow = false; +279 out: for (RegionCoprocessorEnvironment env: coprocEnvironments) { +280 if (env.getInstance() instanceof RegionObserver) { +281 Class<?> clazz = env.getInstance().getClass(); +282 for(;;) { +283 if (clazz == null) { +284 // we must have directly implemented RegionObserver +285 hasCustomPostScannerFilterRow = true; +286 break out; +287 } +288 try { +289 clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class, +290 InternalScanner.class, Cell.class, boolean.class); +291 // this coprocessor has a custom version of postScannerFilterRow +292 hasCustomPostScannerFilterRow = true; +293 break out; +294 } catch (NoSuchMethodException ignore) { +295 } +296 // the deprecated signature still exists +297 try { +298 clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class, +299 InternalScanner.class, byte[].class, int.class, short.class, boolean.class); +300 // this coprocessor has a custom version of postScannerFilterRow +301 hasCustomPostScannerFilterRow = true; +302 break out; +303 } catch (NoSuchMethodException ignore) { +304 } +305 clazz = clazz.getSuperclass(); +306 } +307 } +308 } +309 this.hasCustomPostScannerFilterRow = hasCustomPostScannerFilterRow; +310 } +311 +312 static List<TableCoprocessorAttribute> getTableCoprocessorAttrsFromSchema(Configuration conf, +313 TableDescriptor htd) { +314 return htd.getCoprocessorDescriptors().stream().map(cp -> { +315 Path path = cp.getJarPath().map(p -> new Path(p)).orElse(null); +316 Configuration ourConf; +317 if (!cp.getProperties().isEmpty()) { +318 // do an explicit deep copy of the passed configuration +319 ourConf = new Configuration(false); +320 HBaseConfiguration.merge(ourConf, conf); +321 cp.getProperties().forEach((k, v) -> ourConf.set(k, v)); +322 } else { +323 ourConf = conf; +324 } +325 return new TableCoprocessorAttribute(path, cp.getClassName(), cp.getPriority(), ourConf); +326 }).collect(Collectors.toList()); +327 } +328 +329 /** +330 * Sanity check the table coprocessor attributes of the supplied schema. Will +331 * throw an exception if there is a problem. +332 * @param conf +333 * @param htd +334 * @throws IOException +335 */ +336 public static void testTableCoprocessorAttrs(final Configuration conf, +337 final TableDescriptor htd) throws IOException { +338 String pathPrefix = UUID.randomUUID().toString(); +339 for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf, htd)) { +340 if (attr.getPriority() < 0) { +341 throw new IOException("Priority for coprocessor " + attr.getClassName() + +342 " cannot be less than 0"); +343 } +344 ClassLoader old = Thread.currentThread().getContextClassLoader(); +345 try { +346 ClassLoader cl; +347 if (attr.getPath() != null) { +348 cl = CoprocessorClassLoader.getClassLoader(attr.getPath(), +349 CoprocessorHost.class.getClassLoader(), pathPrefix, conf); +350 } else { +351 cl = CoprocessorHost.class.getClassLoader(); +352 } +353 Thread.currentThread().setContextClassLoader(cl); +354 if (cl instanceof CoprocessorClassLoader) { +355 String[] includedClassPrefixes = null; +356 if (conf.get(HConstants.CP_HTD_ATTR_INCLUSION_KEY) != null) { +357 String prefixes = attr.conf.get(HConstants.CP_HTD_ATTR_INCLUSION_KEY); +358 includedClassPrefixes = prefixes.split(";"); +359 } +360 ((CoprocessorClassLoader)cl).loadClass(attr.getClassName(), includedClassPrefixes); +361 } else { +362 cl.loadClass(attr.getClassName()); +363 } +364 } catch (ClassNotFoundException e) { +365 throw new IOException("Class " + attr.getClassName() + " cannot be loaded", e); +366 } finally { +367 Thread.currentThread().setContextClassLoader(old); +368 } 369 } -370 -371 // scan the table attributes for coprocessor load specifications -372 // initialize the coprocessors -373 List<RegionCoprocessorEnvironment> configured = new ArrayList<>(); -374 for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf, -375 region.getTableDescriptor())) { -376 // Load encompasses classloading and coprocessor initialization -377 try { -378 RegionCoprocessorEnvironment env = load(attr.getPath(), attr.getClassName(), -379 attr.getPriority(), attr.getConf()); -380 if (env == null) { -381 continue; -382 } -383 configured.add(env); -384 LOG.info("Loaded coprocessor " + attr.getClassName() + " from HTD of " + -385 region.getTableDescriptor().getTableName().getNameAsString() + " successfully."); -386 } catch (Throwable t) { -387 // Coprocessor failed to load, do we abort on error? -388 if (conf.getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) { -389 abortServer(attr.getClassName(), t); -390 } else { -391 LOG.error("Failed to load coprocessor " + attr.getClassName(), t); +370 } +371 +372 void loadTableCoprocessors(final Configuration conf) { +373 boolean coprocessorsEnabled = conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY, +374 DEFAULT_COPROCESSORS_ENABLED); +375 boolean tableCoprocessorsEnabled = conf.getBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY, +376 DEFAULT_USER_COPROCESSORS_ENABLED); +377 if (!(coprocessorsEnabled && tableCoprocessorsEnabled)) { +378 return; +379 } +380 +381 // scan the table attributes for coprocessor load specifications +382 // initialize the coprocessors +383 List<RegionCoprocessorEnvironment> configured = new ArrayList<>(); +384 for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf, +385 region.getTableDescriptor())) { +386 // Load encompasses classloading and coprocessor initialization +387 try { +388 RegionCoprocessorEnvironment env = load(attr.getPath(), attr.getClassName(), +389 attr.getPriority(), attr.getConf()); +390 if (env == null) { +391 continue; 392 } -393 } -394 } -395 // add together to coprocessor set for COW efficiency -396 coprocEnvironments.addAll(configured); -397 } -398 -399 @Override -400 public RegionEnvironment createEnvironment(RegionCoprocessor instance, int priority, int seq, -401 Configuration conf) { -402 // If coprocessor exposes any services, register them. -403 for (Service service : instance.getServices()) { -404 region.registerService(service); -405 } -406 ConcurrentMap<String, Object> classData; -407 // make sure only one thread can add maps -408 synchronized (SHARED_DATA_MAP) { -409 // as long as at least one RegionEnvironment holds on to its classData it will -410 // remain in this map -411 classData = -412 SHARED_DATA_MAP.computeIfAbsent(instance.getClass().getName(), -413 k -> new ConcurrentHashMap<>()); -414 } -415 // If a CoreCoprocessor, return a 'richer' environment, one laden with RegionServerServices. -416 return instance.getClass().isAnnotationPresent(CoreCoprocessor.class)? -417 new RegionEnvironmentForCoreCoprocessors(instance, priority, seq, conf, region, -418 rsServices, classData): -419 new RegionEnvironment(instance, priority, seq, conf, region, rsServices, classData); -420 } -421 -422 @Override -423 public RegionCoprocessor checkAndGetInstance(Class<?> implClass) -424 throws InstantiationException, IllegalAccessException { -425 try { -426 if (RegionCoprocessor.class.isAssignableFrom(implClass)) { -427 return implClass.asSubclass(RegionCoprocessor.class).getDeclaredConstructor().newInstance(); -428 } else if (CoprocessorService.class.isAssignableFrom(implClass)) { -429 // For backward compatibility with old CoprocessorService impl which don't extend -430 // RegionCoprocessor. -431 CoprocessorService cs; -432 cs = implClass.asSubclass(CoprocessorService.class).getDeclaredConstructor().newInstance(); -433 return new CoprocessorServiceBackwardCompatiblity.RegionCoprocessorService(cs); -434 } else { -435 LOG.error("{} is not of type RegionCoprocessor. Check the configuration of {}", -436 implClass.getName(), CoprocessorHost.REGION_COPROCESSOR_CONF_KEY); -437 return null; -438 } -439 } catch (NoSuchMethodException | InvocationTargetException e) { -440 throw (InstantiationException) new InstantiationException(implClass.getName()).initCause(e); -441 } -442 } -443 -444 private ObserverGetter<RegionCoprocessor, RegionObserver> regionObserverGetter = -445 RegionCoprocessor::getRegionObserver; -446 -447 private ObserverGetter<RegionCoprocessor, EndpointObserver> endpointObserverGetter = -448 RegionCoprocessor::getEndpointObserver; -449 -450 abstract class RegionObserverOperationWithoutResult extends -451 ObserverOperationWithoutResult<RegionObserver> { -452 public RegionObserverOperationWithoutResult() { -453 super(regionObserverGetter); -454 } -455 -456 public RegionObserverOperationWithoutResult(User user) { -457 super(regionObserverGetter, user); -458 } +393 configured.add(env); +394 LOG.info("Loaded coprocessor " + attr.getClassName() + " from HTD of " + +395 region.getTableDescriptor().getTableName().getNameAsString() + " successfully."); +396 } catch (Throwable t) { +397 // Coprocessor failed to load, do we abort on error? +398 if (conf.getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) { +399 abortServer(attr.getClassName(), t); +400 } else { +401 LOG.error("Failed to load coprocessor " + attr.getClassName(), t); +402 } +403 } +404 } +405 // add together to coprocessor set for COW efficiency +406 coprocEnvironments.addAll(configured); +407 } +408 +409 @Override +410 public RegionEnvironment createEnvironment(RegionCoprocessor instance, int priority, int seq, +411 Configuration conf) { +412 // If coprocessor exposes any services, register them. +413 for (Service service : instance.getServices()) { +414 region.registerService(service); +415 } +416 ConcurrentMap<String, Object> classData; +417 // make sure only one thread can add maps +418 synchronized (SHARED_DATA_MAP) { +419 // as long as at least one RegionEnvironment holds on to its classData it will +420 // remain in this map +421 classData = +422 SHARED_DATA_MAP.computeIfAbsent(instance.getClass().getName(), +423 k -> new ConcurrentHashMap<>()); +424 } +425 // If a CoreCoprocessor, return a 'richer' environment, one laden with RegionServerServices. +426 return instance.getClass().isAnnotationPresent(CoreCoprocessor.class)? +427 new RegionEnvironmentForCoreCoprocessors(instance, priority, seq, conf, region, +428 rsServices, classData): +429 new RegionEnvironment(instance, priority, seq, conf, region, rsServices, classData); +430 } +431 +432 @Override +433 public RegionCoprocessor checkAndGetInstance(Class<?> implClass) +434 throws InstantiationException, IllegalAccessException { +435 try { +436 if (RegionCoprocessor.class.isAssignableFrom(implClass)) { +437 return implClass.asSubclass(RegionCoprocessor.class).getDeclaredConstructor().newInstance(); +438 } else if (CoprocessorService.class.isAssignableFrom(implClass)) { +439 // For backward compatibility with old CoprocessorService impl which don't extend +440 // RegionCoprocessor. +441 CoprocessorService cs; +442 cs = implClass.asSubclass(CoprocessorService.class).getDeclaredConstructor().newInstance(); +443 return new CoprocessorServiceBackwardCompatiblity.RegionCoprocessorService(cs); +444 } else { +445 LOG.error("{} is not of type RegionCoprocessor. Check the configuration of {}", +446 implClass.getName(), CoprocessorHost.REGION_COPROCESSOR_CONF_KEY); +447 return null; +448 } +449 } catch (NoSuchMethodException | InvocationTargetException e) { +450 throw (InstantiationException) new InstantiationException(implClass.getName()).initCause(e); +451 } +452 } +453 +454 private ObserverGetter<RegionCoprocessor, RegionObserver> regionObserverGetter = +455 RegionCoprocessor::getRegionObserver; +456 +457 private ObserverGetter<RegionCoprocessor, EndpointObserver> endpointObserverGetter = +458 RegionCoprocessor::getEndpointObserver; 459 -460 public RegionObserverOperationWithoutResult(boolean bypassable) { -461 super(regionObserverGetter, null, bypassable); -462 } -463 -464 public RegionObserverOperationWithoutResult(User user, boolean bypassable) { -465 super(regionObserverGetter, user, bypassable); -466 } -467 } -468 -469 abstract class BulkLoadObserverOperation extends -470 ObserverOperationWithoutResult<BulkLoadObserver> { -471 public BulkLoadObserverOperation(User user) { -472 super(RegionCoprocessor::getBulkLoadObserver, user); -473 } -474 } -475 -476 -477 ////////////////////////////////////////////////////////////////////////////////////////////////// -478 // Observer operations -479 ////////////////////////////////////////////////////////////////////////////////////////////////// -480 -481 ////////////////////////////////////////////////////////////////////////////////////////////////// -482 // Observer operations -483 ////////////////////////////////////////////////////////////////////////////////////////////////// -484 -485 /** -486 * Invoked before a region open. -487 * -488 * @throws IOException Signals that an I/O exception has occurred. -489 */ -490 public void preOpen() throws IOException { -491 if (coprocEnvironments.isEmpty()) { -492 return; -493 } -494 execOperation(new RegionObserverOperationWithoutResult() { -495 @Override -496 public void call(RegionObserver observer) throws IOException { -497 observer.preOpen(this); -498 } -499 }); -500 } -501 -502 -503 /** -504 * Invoked after a region open -505 */ -