Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 28179200D19 for ; Thu, 21 Sep 2017 17:13:40 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 269781609D0; Thu, 21 Sep 2017 15:13:40 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C8E7C1609EC for ; Thu, 21 Sep 2017 17:13:37 +0200 (CEST) Received: (qmail 96568 invoked by uid 500); 21 Sep 2017 15:13:36 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 95973 invoked by uid 99); 21 Sep 2017 15:13:35 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 21 Sep 2017 15:13:35 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 53263F5A4F; Thu, 21 Sep 2017 15:13:34 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Thu, 21 Sep 2017 15:13:41 -0000 Message-Id: <6096864a1dca4cf0ba50740938c66a4e@git.apache.org> In-Reply-To: <8d5689eca5904cd1bafec1556f5885a0@git.apache.org> References: <8d5689eca5904cd1bafec1556f5885a0@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [08/51] [partial] hbase-site git commit: Published site at . archived-at: Thu, 21 Sep 2017 15:13:40 -0000 http://git-wip-us.apache.org/repos/asf/hbase-site/blob/fa7d6c0c/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.RegionOperation.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.RegionOperation.html b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.RegionOperation.html index cf3f141..a4d46aa 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.RegionOperation.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.RegionOperation.html @@ -27,1612 +27,1609 @@ 019 020package org.apache.hadoop.hbase.regionserver; 021 -022import com.google.protobuf.Message; -023import com.google.protobuf.Service; -024 -025import java.io.IOException; -026import java.util.ArrayList; -027import java.util.List; -028import java.util.Map; -029import java.util.NavigableSet; -030import java.util.UUID; -031import java.util.concurrent.ConcurrentHashMap; -032import java.util.concurrent.ConcurrentMap; -033import java.util.regex.Matcher; -034 -035import org.apache.commons.collections4.map.AbstractReferenceMap; -036import org.apache.commons.collections4.map.ReferenceMap; -037import org.apache.commons.lang3.ClassUtils; -038import org.apache.commons.logging.Log; -039import org.apache.commons.logging.LogFactory; -040import org.apache.hadoop.conf.Configuration; -041import org.apache.hadoop.fs.FileSystem; -042import org.apache.hadoop.fs.Path; -043import org.apache.hadoop.hbase.Cell; -044import org.apache.hadoop.hbase.CompareOperator; -045import org.apache.hadoop.hbase.Coprocessor; -046import org.apache.hadoop.hbase.HBaseConfiguration; -047import org.apache.hadoop.hbase.HBaseInterfaceAudience; -048import org.apache.hadoop.hbase.HConstants; -049import org.apache.hadoop.hbase.HRegionInfo; -050import org.apache.yetus.audience.InterfaceAudience; -051import org.apache.yetus.audience.InterfaceStability; -052import org.apache.hadoop.hbase.client.Append; -053import org.apache.hadoop.hbase.client.Delete; -054import org.apache.hadoop.hbase.client.Durability; -055import org.apache.hadoop.hbase.client.Get; -056import org.apache.hadoop.hbase.client.Increment; -057import org.apache.hadoop.hbase.client.Mutation; -058import org.apache.hadoop.hbase.client.Put; -059import org.apache.hadoop.hbase.client.Result; -060import org.apache.hadoop.hbase.client.Scan; -061import org.apache.hadoop.hbase.client.TableDescriptor; -062import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; -063import org.apache.hadoop.hbase.coprocessor.CoprocessorService; -064import org.apache.hadoop.hbase.coprocessor.EndpointObserver; -065import org.apache.hadoop.hbase.coprocessor.MetricsCoprocessor; -066import org.apache.hadoop.hbase.coprocessor.ObserverContext; -067import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -068import org.apache.hadoop.hbase.coprocessor.RegionObserver; -069import org.apache.hadoop.hbase.coprocessor.RegionObserver.MutationType; -070import org.apache.hadoop.hbase.filter.ByteArrayComparable; -071import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; -072import org.apache.hadoop.hbase.io.Reference; -073import org.apache.hadoop.hbase.io.hfile.CacheConfig; -074import org.apache.hadoop.hbase.ipc.RpcServer; -075import org.apache.hadoop.hbase.metrics.MetricRegistry; -076import org.apache.hadoop.hbase.regionserver.Region.Operation; -077import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; -078import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; -079import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker; -080import org.apache.hadoop.hbase.security.User; -081import org.apache.hadoop.hbase.util.Bytes; -082import org.apache.hadoop.hbase.util.CoprocessorClassLoader; -083import org.apache.hadoop.hbase.util.Pair; -084import org.apache.hadoop.hbase.wal.WALEdit; -085import org.apache.hadoop.hbase.wal.WALKey; +022import java.io.IOException; +023import java.util.ArrayList; +024import java.util.List; +025import java.util.Map; +026import java.util.NavigableSet; +027import java.util.UUID; +028import java.util.concurrent.ConcurrentHashMap; +029import java.util.concurrent.ConcurrentMap; +030import java.util.regex.Matcher; +031 +032import com.google.protobuf.Message; +033import com.google.protobuf.Service; +034import org.apache.commons.collections4.map.AbstractReferenceMap; +035import org.apache.commons.collections4.map.ReferenceMap; +036import org.apache.commons.lang3.ClassUtils; +037import org.apache.commons.logging.Log; +038import org.apache.commons.logging.LogFactory; +039import org.apache.hadoop.conf.Configuration; +040import org.apache.hadoop.fs.FileSystem; +041import org.apache.hadoop.fs.Path; +042import org.apache.hadoop.hbase.Cell; +043import org.apache.hadoop.hbase.CompareOperator; +044import org.apache.hadoop.hbase.Coprocessor; +045import org.apache.hadoop.hbase.HBaseConfiguration; +046import org.apache.hadoop.hbase.HBaseInterfaceAudience; +047import org.apache.hadoop.hbase.HConstants; +048import org.apache.hadoop.hbase.client.Append; +049import org.apache.hadoop.hbase.client.Delete; +050import org.apache.hadoop.hbase.client.Durability; +051import org.apache.hadoop.hbase.client.Get; +052import org.apache.hadoop.hbase.client.Increment; +053import org.apache.hadoop.hbase.client.Mutation; +054import org.apache.hadoop.hbase.client.Put; +055import org.apache.hadoop.hbase.client.RegionInfo; +056import org.apache.hadoop.hbase.client.Result; +057import org.apache.hadoop.hbase.client.Scan; +058import org.apache.hadoop.hbase.client.TableDescriptor; +059import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +060import org.apache.hadoop.hbase.coprocessor.CoprocessorService; +061import org.apache.hadoop.hbase.coprocessor.EndpointObserver; +062import org.apache.hadoop.hbase.coprocessor.MetricsCoprocessor; +063import org.apache.hadoop.hbase.coprocessor.ObserverContext; +064import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +065import org.apache.hadoop.hbase.coprocessor.RegionObserver; +066import org.apache.hadoop.hbase.coprocessor.RegionObserver.MutationType; +067import org.apache.hadoop.hbase.filter.ByteArrayComparable; +068import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; +069import org.apache.hadoop.hbase.io.Reference; +070import org.apache.hadoop.hbase.io.hfile.CacheConfig; +071import org.apache.hadoop.hbase.ipc.RpcServer; +072import org.apache.hadoop.hbase.metrics.MetricRegistry; +073import org.apache.hadoop.hbase.regionserver.Region.Operation; +074import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; +075import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker; +076import org.apache.hadoop.hbase.security.User; +077import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableList; +078import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; +079import org.apache.hadoop.hbase.util.Bytes; +080import org.apache.hadoop.hbase.util.CoprocessorClassLoader; +081import org.apache.hadoop.hbase.util.Pair; +082import org.apache.hadoop.hbase.wal.WALEdit; +083import org.apache.hadoop.hbase.wal.WALKey; +084import org.apache.yetus.audience.InterfaceAudience; +085import org.apache.yetus.audience.InterfaceStability; 086 -087import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableList; -088import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; -089 -090/** -091 * Implements the coprocessor environment and runtime support for coprocessors -092 * loaded within a {@link Region}. -093 */ -094@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) -095@InterfaceStability.Evolving -096public class RegionCoprocessorHost -097 extends CoprocessorHost<RegionCoprocessorHost.RegionEnvironment> { -098 -099 private static final Log LOG = LogFactory.getLog(RegionCoprocessorHost.class); -100 // The shared data map -101 private static final ReferenceMap<String, ConcurrentMap<String, Object>> SHARED_DATA_MAP = -102 new ReferenceMap<>(AbstractReferenceMap.ReferenceStrength.HARD, -103 AbstractReferenceMap.ReferenceStrength.WEAK); +087/** +088 * Implements the coprocessor environment and runtime support for coprocessors +089 * loaded within a {@link Region}. +090 */ +091@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) +092@InterfaceStability.Evolving +093public class RegionCoprocessorHost +094 extends CoprocessorHost<RegionCoprocessorHost.RegionEnvironment> { +095 +096 private static final Log LOG = LogFactory.getLog(RegionCoprocessorHost.class); +097 // The shared data map +098 private static final ReferenceMap<String, ConcurrentMap<String, Object>> SHARED_DATA_MAP = +099 new ReferenceMap<>(AbstractReferenceMap.ReferenceStrength.HARD, +100 AbstractReferenceMap.ReferenceStrength.WEAK); +101 +102 // optimization: no need to call postScannerFilterRow, if no coprocessor implements it +103 private final boolean hasCustomPostScannerFilterRow; 104 -105 // optimization: no need to call postScannerFilterRow, if no coprocessor implements it -106 private final boolean hasCustomPostScannerFilterRow; -107 -108 /** -109 * -110 * Encapsulation of the environment of each coprocessor -111 */ -112 static class RegionEnvironment extends CoprocessorHost.Environment -113 implements RegionCoprocessorEnvironment { -114 -115 private Region region; -116 private RegionServerServices rsServices; -117 ConcurrentMap<String, Object> sharedData; -118 private final MetricRegistry metricRegistry; -119 -120 /** -121 * Constructor -122 * @param impl the coprocessor instance -123 * @param priority chaining priority -124 */ -125 public RegionEnvironment(final Coprocessor impl, final int priority, -126 final int seq, final Configuration conf, final Region region, -127 final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) { -128 super(impl, priority, seq, conf); -129 this.region = region; -130 this.rsServices = services; -131 this.sharedData = sharedData; -132 this.metricRegistry = -133 MetricsCoprocessor.createRegistryForRegionCoprocessor(impl.getClass().getName()); -134 } -135 -136 /** @return the region */ -137 @Override -138 public Region getRegion() { -139 return region; -140 } -141 -142 /** @return reference to the region server services */ -143 @Override -144 public RegionServerServices getRegionServerServices() { -145 return rsServices; -146 } -147 -148 public void shutdown() { -149 super.shutdown(); -150 MetricsCoprocessor.removeRegistry(this.metricRegistry); -151 } -152 -153 @Override -154 public ConcurrentMap<String, Object> getSharedData() { -155 return sharedData; -156 } -157 -158 @Override -159 public HRegionInfo getRegionInfo() { -160 return region.getRegionInfo(); -161 } -162 -163 @Override -164 public MetricRegistry getMetricRegistryForRegionServer() { -165 return metricRegistry; -166 } -167 } -168 -169 static class TableCoprocessorAttribute { -170 private Path path; -171 private String className; -172 private int priority; -173 private Configuration conf; -174 -175 public TableCoprocessorAttribute(Path path, String className, int priority, -176 Configuration conf) { -177 this.path = path; -178 this.className = className; -179 this.priority = priority; -180 this.conf = conf; -181 } -182 -183 public Path getPath() { -184 return path; -185 } -186 -187 public String getClassName() { -188 return className; -189 } -190 -191 public int getPriority() { -192 return priority; -193 } -194 -195 public Configuration getConf() { -196 return conf; -197 } -198 } -199 -200 /** The region server services */ -201 RegionServerServices rsServices; -202 /** The region */ -203 Region region; -204 -205 /** -206 * Constructor -207 * @param region the region -208 * @param rsServices interface to available region server functionality -209 * @param conf the configuration -210 */ -211 public RegionCoprocessorHost(final Region region, -212 final RegionServerServices rsServices, final Configuration conf) { -213 super(rsServices); -214 this.conf = conf; -215 this.rsServices = rsServices; -216 this.region = region; -217 this.pathPrefix = Integer.toString(this.region.getRegionInfo().hashCode()); +105 /** +106 * +107 * Encapsulation of the environment of each coprocessor +108 */ +109 static class RegionEnvironment extends CoprocessorHost.Environment +110 implements RegionCoprocessorEnvironment { +111 +112 private Region region; +113 private RegionServerServices rsServices; +114 ConcurrentMap<String, Object> sharedData; +115 private final MetricRegistry metricRegistry; +116 +117 /** +118 * Constructor +119 * @param impl the coprocessor instance +120 * @param priority chaining priority +121 */ +122 public RegionEnvironment(final Coprocessor impl, final int priority, +123 final int seq, final Configuration conf, final Region region, +124 final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) { +125 super(impl, priority, seq, conf); +126 this.region = region; +127 this.rsServices = services; +128 this.sharedData = sharedData; +129 this.metricRegistry = +130 MetricsCoprocessor.createRegistryForRegionCoprocessor(impl.getClass().getName()); +131 } +132 +133 /** @return the region */ +134 @Override +135 public Region getRegion() { +136 return region; +137 } +138 +139 /** @return reference to the region server services */ +140 @Override +141 public RegionServerServices getRegionServerServices() { +142 return rsServices; +143 } +144 +145 public void shutdown() { +146 super.shutdown(); +147 MetricsCoprocessor.removeRegistry(this.metricRegistry); +148 } +149 +150 @Override +151 public ConcurrentMap<String, Object> getSharedData() { +152 return sharedData; +153 } +154 +155 @Override +156 public RegionInfo getRegionInfo() { +157 return region.getRegionInfo(); +158 } +159 +160 @Override +161 public MetricRegistry getMetricRegistryForRegionServer() { +162 return metricRegistry; +163 } +164 } +165 +166 static class TableCoprocessorAttribute { +167 private Path path; +168 private String className; +169 private int priority; +170 private Configuration conf; +171 +172 public TableCoprocessorAttribute(Path path, String className, int priority, +173 Configuration conf) { +174 this.path = path; +175 this.className = className; +176 this.priority = priority; +177 this.conf = conf; +178 } +179 +180 public Path getPath() { +181 return path; +182 } +183 +184 public String getClassName() { +185 return className; +186 } +187 +188 public int getPriority() { +189 return priority; +190 } +191 +192 public Configuration getConf() { +193 return conf; +194 } +195 } +196 +197 /** The region server services */ +198 RegionServerServices rsServices; +199 /** The region */ +200 Region region; +201 +202 /** +203 * Constructor +204 * @param region the region +205 * @param rsServices interface to available region server functionality +206 * @param conf the configuration +207 */ +208 public RegionCoprocessorHost(final Region region, +209 final RegionServerServices rsServices, final Configuration conf) { +210 super(rsServices); +211 this.conf = conf; +212 this.rsServices = rsServices; +213 this.region = region; +214 this.pathPrefix = Integer.toString(this.region.getRegionInfo().hashCode()); +215 +216 // load system default cp's from configuration. +217 loadSystemCoprocessors(conf, REGION_COPROCESSOR_CONF_KEY); 218 -219 // load system default cp's from configuration. -220 loadSystemCoprocessors(conf, REGION_COPROCESSOR_CONF_KEY); -221 -222 // load system default cp's for user tables from configuration. -223 if (!region.getRegionInfo().getTable().isSystemTable()) { -224 loadSystemCoprocessors(conf, USER_REGION_COPROCESSOR_CONF_KEY); -225 } +219 // load system default cp's for user tables from configuration. +220 if (!region.getRegionInfo().getTable().isSystemTable()) { +221 loadSystemCoprocessors(conf, USER_REGION_COPROCESSOR_CONF_KEY); +222 } +223 +224 // load Coprocessor From HDFS +225 loadTableCoprocessors(conf); 226 -227 // load Coprocessor From HDFS -228 loadTableCoprocessors(conf); -229 -230 // now check whether any coprocessor implements postScannerFilterRow -231 boolean hasCustomPostScannerFilterRow = false; -232 out: for (RegionEnvironment env: coprocessors) { -233 if (env.getInstance() instanceof RegionObserver) { -234 Class<?> clazz = env.getInstance().getClass(); -235 for(;;) { -236 if (clazz == null) { -237 // we must have directly implemented RegionObserver -238 hasCustomPostScannerFilterRow = true; -239 break out; -240 } -241 try { -242 clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class, -243 InternalScanner.class, Cell.class, boolean.class); -244 // this coprocessor has a custom version of postScannerFilterRow -245 hasCustomPostScannerFilterRow = true; -246 break out; -247 } catch (NoSuchMethodException ignore) { -248 } -249 // the deprecated signature still exists -250 try { -251 clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class, -252 InternalScanner.class, byte[].class, int.class, short.class, boolean.class); -253 // this coprocessor has a custom version of postScannerFilterRow -254 hasCustomPostScannerFilterRow = true; -255 break out; -256 } catch (NoSuchMethodException ignore) { -257 } -258 clazz = clazz.getSuperclass(); -259 } -260 } -261 } -262 this.hasCustomPostScannerFilterRow = hasCustomPostScannerFilterRow; -263 } -264 -265 static List<TableCoprocessorAttribute> getTableCoprocessorAttrsFromSchema(Configuration conf, -266 TableDescriptor htd) { -267 List<TableCoprocessorAttribute> result = Lists.newArrayList(); -268 for (Map.Entry<Bytes, Bytes> e: htd.getValues().entrySet()) { -269 String key = Bytes.toString(e.getKey().get()).trim(); -270 if (HConstants.CP_HTD_ATTR_KEY_PATTERN.matcher(key).matches()) { -271 String spec = Bytes.toString(e.getValue().get()).trim(); -272 // found one -273 try { -274 Matcher matcher = HConstants.CP_HTD_ATTR_VALUE_PATTERN.matcher(spec); -275 if (matcher.matches()) { -276 // jar file path can be empty if the cp class can be loaded -277 // from class loader. -278 Path path = matcher.group(1).trim().isEmpty() ? -279 null : new Path(matcher.group(1).trim()); -280 String className = matcher.group(2).trim(); -281 if (className.isEmpty()) { -282 LOG.error("Malformed table coprocessor specification: key=" + -283 key + ", spec: " + spec); -284 continue; -285 } -286 String priorityStr = matcher.group(3).trim(); -287 int priority = priorityStr.isEmpty() ? -288 Coprocessor.PRIORITY_USER : Integer.parseInt(priorityStr); -289 String cfgSpec = null; -290 try { -291 cfgSpec = matcher.group(4); -292 } catch (IndexOutOfBoundsException ex) { -293 // ignore -294 } -295 Configuration ourConf; -296 if (cfgSpec != null && !cfgSpec.trim().equals("|")) { -297 cfgSpec = cfgSpec.substring(cfgSpec.indexOf('|') + 1); -298 // do an explicit deep copy of the passed configuration -299 ourConf = new Configuration(false); -300 HBaseConfiguration.merge(ourConf, conf); -301 Matcher m = HConstants.CP_HTD_ATTR_VALUE_PARAM_PATTERN.matcher(cfgSpec); -302 while (m.find()) { -303 ourConf.set(m.group(1), m.group(2)); -304 } -305 } else { -306 ourConf = conf; -307 } -308 result.add(new TableCoprocessorAttribute(path, className, priority, ourConf)); -309 } else { -310 LOG.error("Malformed table coprocessor specification: key=" + key + -311 ", spec: " + spec); -312 } -313 } catch (Exception ioe) { -314 LOG.error("Malformed table coprocessor specification: key=" + key + -315 ", spec: " + spec); -316 } -317 } -318 } -319 return result; -320 } -321 -322 /** -323 * Sanity check the table coprocessor attributes of the supplied schema. Will -324 * throw an exception if there is a problem. -325 * @param conf -326 * @param htd -327 * @throws IOException -328 */ -329 public static void testTableCoprocessorAttrs(final Configuration conf, -330 final TableDescriptor htd) throws IOException { -331 String pathPrefix = UUID.randomUUID().toString(); -332 for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf, htd)) { -333 if (attr.getPriority() < 0) { -334 throw new IOException("Priority for coprocessor " + attr.getClassName() + -335 " cannot be less than 0"); -336 } -337 ClassLoader old = Thread.currentThread().getContextClassLoader(); -338 try { -339 ClassLoader cl; -340 if (attr.getPath() != null) { -341 cl = CoprocessorClassLoader.getClassLoader(attr.getPath(), -342 CoprocessorHost.class.getClassLoader(), pathPrefix, conf); -343 } else { -344 cl = CoprocessorHost.class.getClassLoader(); -345 } -346 Thread.currentThread().setContextClassLoader(cl); -347 cl.loadClass(attr.getClassName()); -348 } catch (ClassNotFoundException e) { -349 throw new IOException("Class " + attr.getClassName() + " cannot be loaded", e); -350 } finally { -351 Thread.currentThread().setContextClassLoader(old); -352 } -353 } -354 } -355 -356 void loadTableCoprocessors(final Configuration conf) { -357 boolean coprocessorsEnabled = conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY, -358 DEFAULT_COPROCESSORS_ENABLED); -359 boolean tableCoprocessorsEnabled = conf.getBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY, -360 DEFAULT_USER_COPROCESSORS_ENABLED); -361 if (!(coprocessorsEnabled && tableCoprocessorsEnabled)) { -362 return; -363 } -364 -365 // scan the table attributes for coprocessor load specifications -366 // initialize the coprocessors -367 List<RegionEnvironment> configured = new ArrayList<>(); -368 for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf, -369 region.getTableDescriptor())) { -370 // Load encompasses classloading and coprocessor initialization -371 try { -372 RegionEnvironment env = load(attr.getPath(), attr.getClassName(), attr.getPriority(), -373 attr.getConf()); -374 configured.add(env); -375 LOG.info("Loaded coprocessor " + attr.getClassName() + " from HTD of " + -376 region.getTableDescriptor().getTableName().getNameAsString() + " successfully."); -377 } catch (Throwable t) { -378 // Coprocessor failed to load, do we abort on error? -379 if (conf.getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) { -380 abortServer(attr.getClassName(), t); -381 } else { -382 LOG.error("Failed to load coprocessor " + attr.getClassName(), t); -383 } -384 } -385 } -386 // add together to coprocessor set for COW efficiency -387 coprocessors.addAll(configured); -388 } -389 -390 @Override -391 public RegionEnvironment createEnvironment(Class<?> implClass, -392 Coprocessor instance, int priority, int seq, Configuration conf) { -393 // Check if it's an Endpoint. -394 // Due to current dynamic protocol design, Endpoint -395 // uses a different way to be registered and executed. -396 // It uses a visitor pattern to invoke registered Endpoint -397 // method. -398 for (Object itf : ClassUtils.getAllInterfaces(implClass)) { -399 Class<?> c = (Class<?>) itf; -400 if (CoprocessorService.class.isAssignableFrom(c)) { -401 region.registerService( ((CoprocessorService)instance).getService() ); -402 } -403 } -404 ConcurrentMap<String, Object> classData; -405 // make sure only one thread can add maps -406 synchronized (SHARED_DATA_MAP) { -407 // as long as at least one RegionEnvironment holds on to its classData it will -408 // remain in this map -409 classData = -410 SHARED_DATA_MAP.computeIfAbsent(implClass.getName(), k -> new ConcurrentHashMap<>()); -411 } -412 return new RegionEnvironment(instance, priority, seq, conf, region, -413 rsServices, classData); -414 } -415 -416 /** -417 * Invoked before a region open. -418 * -419 * @throws IOException Signals that an I/O exception has occurred. -420 */ -421 public void preOpen() throws IOException { -422 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { -423 @Override -424 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) -425 throws IOException { -426 oserver.preOpen(ctx); -427 } -428 }); -429 } -430 -431 /** -432 * Invoked after a region open -433 */ -434 public void postOpen() { -435 try { -436 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { -437 @Override -438 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) -439 throws IOException { -440 oserver.postOpen(ctx); -441 } -442 }); -443 } catch (IOException e) { -444 LOG.warn(e); -445 } -446 } -447 -448 /** -449 * Invoked after log replay on region -450 */ -451 public void postLogReplay() { -452 try { -453 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { -454 @Override -455 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) -456 throws IOException { -457 oserver.postLogReplay(ctx); -458 } -459 }); -460 } catch (IOException e) { -461 LOG.warn(e); -462 } -463 } -464 -465 /** -466 * Invoked before a region is closed -467 * @param abortRequested true if the server is aborting -468 */ -469 public void preClose(final boolean abortRequested) throws IOException { -470 execOperation(false, new RegionOperation() { -471 @Override -472 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) -473 throws IOException { -474 oserver.preClose(ctx, abortRequested); -475 } -476 }); -477 } -478 -479 /** -480 * Invoked after a region is closed -481 * @param abortRequested true if the server is aborting -482 */ -483 public void postClose(final boolean abortRequested) { -484 try { -485 execOperation(false, new RegionOperation() { -486 @Override -487 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) -488 throws IOException { -489 oserver.postClose(ctx, abortRequested); +227 // now check whether any coprocessor implements postScannerFilterRow +228 boolean hasCustomPostScannerFilterRow = false; +229 out: for (RegionEnvironment env: coprocessors) { +230 if (env.getInstance() instanceof RegionObserver) { +231 Class<?> clazz = env.getInstance().getClass(); +232 for(;;) { +233 if (clazz == null) { +234 // we must have directly implemented RegionObserver +235 hasCustomPostScannerFilterRow = true; +236 break out; +237 } +238 try { +239 clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class, +240 InternalScanner.class, Cell.class, boolean.class); +241 // this coprocessor has a custom version of postScannerFilterRow +242 hasCustomPostScannerFilterRow = true; +243 break out; +244 } catch (NoSuchMethodException ignore) { +245 } +246 // the deprecated signature still exists +247 try { +248 clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class, +249 InternalScanner.class, byte[].class, int.class, short.class, boolean.class); +250 // this coprocessor has a custom version of postScannerFilterRow +251 hasCustomPostScannerFilterRow = true; +252 break out; +253 } catch (NoSuchMethodException ignore) { +254 } +255 clazz = clazz.getSuperclass(); +256 } +257 } +258 } +259 this.hasCustomPostScannerFilterRow = hasCustomPostScannerFilterRow; +260 } +261 +262 static List<TableCoprocessorAttribute> getTableCoprocessorAttrsFromSchema(Configuration conf, +263 TableDescriptor htd) { +264 List<TableCoprocessorAttribute> result = Lists.newArrayList(); +265 for (Map.Entry<Bytes, Bytes> e: htd.getValues().entrySet()) { +266 String key = Bytes.toString(e.getKey().get()).trim(); +267 if (HConstants.CP_HTD_ATTR_KEY_PATTERN.matcher(key).matches()) { +268 String spec = Bytes.toString(e.getValue().get()).trim(); +269 // found one +270 try { +271 Matcher matcher = HConstants.CP_HTD_ATTR_VALUE_PATTERN.matcher(spec); +272 if (matcher.matches()) { +273 // jar file path can be empty if the cp class can be loaded +274 // from class loader. +275 Path path = matcher.group(1).trim().isEmpty() ? +276 null : new Path(matcher.group(1).trim()); +277 String className = matcher.group(2).trim(); +278 if (className.isEmpty()) { +279 LOG.error("Malformed table coprocessor specification: key=" + +280 key + ", spec: " + spec); +281 continue; +282 } +283 String priorityStr = matcher.group(3).trim(); +284 int priority = priorityStr.isEmpty() ? +285 Coprocessor.PRIORITY_USER : Integer.parseInt(priorityStr); +286 String cfgSpec = null; +287 try { +288 cfgSpec = matcher.group(4); +289 } catch (IndexOutOfBoundsException ex) { +290 // ignore +291 } +292 Configuration ourConf; +293 if (cfgSpec != null && !cfgSpec.trim().equals("|")) { +294 cfgSpec = cfgSpec.substring(cfgSpec.indexOf('|') + 1); +295 // do an explicit deep copy of the passed configuration +296 ourConf = new Configuration(false); +297 HBaseConfiguration.merge(ourConf, conf); +298 Matcher m = HConstants.CP_HTD_ATTR_VALUE_PARAM_PATTERN.matcher(cfgSpec); +299 while (m.find()) { +300 ourConf.set(m.group(1), m.group(2)); +301 } +302 } else { +303 ourConf = conf; +304 } +305 result.add(new TableCoprocessorAttribute(path, className, priority, ourConf)); +306 } else { +307 LOG.error("Malformed table coprocessor specification: key=" + key + +308 ", spec: " + spec); +309 } +310 } catch (Exception ioe) { +311 LOG.error("Malformed table coprocessor specification: key=" + key + +312 ", spec: " + spec); +313 } +314 } +315 } +316 return result; +317 } +318 +319 /** +320 * Sanity check the table coprocessor attributes of the supplied schema. Will +321 * throw an exception if there is a problem. +322 * @param conf +323 * @param htd +324 * @throws IOException +325 */ +326 public static void testTableCoprocessorAttrs(final Configuration conf, +327 final TableDescriptor htd) throws IOException { +328 String pathPrefix = UUID.randomUUID().toString(); +329 for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf, htd)) { +330 if (attr.getPriority() < 0) { +331 throw new IOException("Priority for coprocessor " + attr.getClassName() + +332 " cannot be less than 0"); +333 } +334 ClassLoader old = Thread.currentThread().getContextClassLoader(); +335 try { +336 ClassLoader cl; +337 if (attr.getPath() != null) { +338 cl = CoprocessorClassLoader.getClassLoader(attr.getPath(), +339 CoprocessorHost.class.getClassLoader(), pathPrefix, conf); +340 } else { +341 cl = CoprocessorHost.class.getClassLoader(); +342 } +343 Thread.currentThread().setContextClassLoader(cl); +344 cl.loadClass(attr.getClassName()); +345 } catch (ClassNotFoundException e) { +346 throw new IOException("Class " + attr.getClassName() + " cannot be loaded", e); +347 } finally { +348 Thread.currentThread().setContextClassLoader(old); +349 } +350 } +351 } +352 +353 void loadTableCoprocessors(final Configuration conf) { +354 boolean coprocessorsEnabled = conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY, +355 DEFAULT_COPROCESSORS_ENABLED); +356 boolean tableCoprocessorsEnabled = conf.getBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY, +357 DEFAULT_USER_COPROCESSORS_ENABLED); +358 if (!(coprocessorsEnabled && tableCoprocessorsEnabled)) { +359 return; +360 } +361 +362 // scan the table attributes for coprocessor load specifications +363 // initialize the coprocessors +364 List<RegionEnvironment> configured = new ArrayList<>(); +365 for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf, +366 region.getTableDescriptor())) { +367 // Load encompasses classloading and coprocessor initialization +368 try { +369 RegionEnvironment env = load(attr.getPath(), attr.getClassName(), attr.getPriority(), +370 attr.getConf()); +371 configured.add(env); +372 LOG.info("Loaded coprocessor " + attr.getClassName() + " from HTD of " + +373 region.getTableDescriptor().getTableName().getNameAsString() + " successfully."); +374 } catch (Throwable t) { +375 // Coprocessor failed to load, do we abort on error? +376 if (conf.getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) { +377 abortServer(attr.getClassName(), t); +378 } else { +379 LOG.error("Failed to load coprocessor " + attr.getClassName(), t); +380 } +381 } +382 } +383 // add together to coprocessor set for COW efficiency +384 coprocessors.addAll(configured); +385 } +386 +387 @Override +388 public RegionEnvironment createEnvironment(Class<?> implClass, +389 Coprocessor instance, int priority, int seq, Configuration conf) { +390 // Check if it's an Endpoint. +391 // Due to current dynamic protocol design, Endpoint +392 // uses a different way to be registered and executed. +393 // It uses a visitor pattern to invoke registered Endpoint +394 // method. +395 for (Object itf : ClassUtils.getAllInterfaces(implClass)) { +396 Class<?> c = (Class<?>) itf; +397 if (CoprocessorService.class.isAssignableFrom(c)) { +398 region.registerService( ((CoprocessorService)instance).getService() ); +399 } +400 } +401 ConcurrentMap<String, Object> classData; +402 // make sure only one thread can add maps +403 synchronized (SHARED_DATA_MAP) { +404 // as long as at least one RegionEnvironment holds on to its classData it will +405 // remain in this map +406 classData = +407 SHARED_DATA_MAP.computeIfAbsent(implClass.getName(), k -> new ConcurrentHashMap<>()); +408 } +409 return new RegionEnvironment(instance, priority, seq, conf, region, +410 rsServices, classData); +411 } +412 +413 /** +414 * Invoked before a region open. +415 * +416 * @throws IOException Signals that an I/O exception has occurred. +417 */ +418 public void preOpen() throws IOException { +419 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { +420 @Override +421 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) +422 throws IOException { +423 oserver.preOpen(ctx); +424 } +425 }); +426 } +427 +428 /** +429 * Invoked after a region open +430 */ +431 public void postOpen() { +432 try { +433 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { +434 @Override +435 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) +436 throws IOException { +437 oserver.postOpen(ctx); +438 } +439 }); +440 } catch (IOException e) { +441 LOG.warn(e); +442 } +443 } +444 +445 /** +446 * Invoked after log replay on region +447 */ +448 public void postLogReplay() { +449 try { +450 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { +451 @Override +452 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) +453 throws IOException { +454 oserver.postLogReplay(ctx); +455 }<