Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 64C69200D1B for ; Wed, 27 Sep 2017 21:50:30 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 632991609C1; Wed, 27 Sep 2017 19:50:30 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 57D6F1609ED for ; Wed, 27 Sep 2017 21:50:23 +0200 (CEST) Received: (qmail 99448 invoked by uid 500); 27 Sep 2017 19:50:22 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 99290 invoked by uid 99); 27 Sep 2017 19:50:22 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 27 Sep 2017 19:50:22 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 56CD4F5BBB; Wed, 27 Sep 2017 19:50:20 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: appy@apache.org To: commits@hbase.apache.org Date: Wed, 27 Sep 2017 19:50:23 -0000 Message-Id: In-Reply-To: <666a725b78d5495e8e5e56fc46087d1f@git.apache.org> References: <666a725b78d5495e8e5e56fc46087d1f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [4/7] hbase git commit: HBASE-17732 Coprocessor Design Improvements archived-at: Wed, 27 Sep 2017 19:50:30 -0000 http://git-wip-us.apache.org/repos/asf/hbase/blob/0c883a23/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterSpaceQuotaObserver.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterSpaceQuotaObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterSpaceQuotaObserver.java index c59e5e7..d6dbcd4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterSpaceQuotaObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterSpaceQuotaObserver.java @@ -17,6 +17,7 @@ package org.apache.hadoop.hbase.quotas; import java.io.IOException; +import java.util.Optional; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CoprocessorEnvironment; @@ -24,6 +25,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor; import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.MasterObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; @@ -35,7 +37,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas; * are deleted. */ @InterfaceAudience.Private -public class MasterSpaceQuotaObserver implements MasterObserver { +public class MasterSpaceQuotaObserver implements MasterCoprocessor, MasterObserver { public static final String REMOVE_QUOTA_ON_TABLE_DELETE = "hbase.quota.remove.on.table.delete"; public static final boolean REMOVE_QUOTA_ON_TABLE_DELETE_DEFAULT = true; @@ -44,6 +46,11 @@ public class MasterSpaceQuotaObserver implements MasterObserver { private boolean quotasEnabled = false; @Override + public Optional getMasterObserver() { + return Optional.of(this); + } + + @Override public void start(CoprocessorEnvironment ctx) throws IOException { this.cpEnv = ctx; this.conf = cpEnv.getConfiguration(); http://git-wip-us.apache.org/repos/asf/hbase/blob/0c883a23/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java index 19491b4..84e9aa5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java @@ -33,7 +33,6 @@ import com.google.protobuf.Message; import com.google.protobuf.Service; import org.apache.commons.collections4.map.AbstractReferenceMap; import org.apache.commons.collections4.map.ReferenceMap; -import org.apache.commons.lang3.ClassUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -56,11 +55,15 @@ import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.coprocessor.BaseEnvironment; +import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.CoprocessorService; +import org.apache.hadoop.hbase.coprocessor.CoprocessorServiceBackwardCompatiblity; import org.apache.hadoop.hbase.coprocessor.EndpointObserver; import org.apache.hadoop.hbase.coprocessor.MetricsCoprocessor; import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.coprocessor.RegionObserver.MutationType; @@ -68,7 +71,6 @@ import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.io.hfile.CacheConfig; -import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.metrics.MetricRegistry; import org.apache.hadoop.hbase.regionserver.Region.Operation; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; @@ -91,7 +93,7 @@ import org.apache.yetus.audience.InterfaceStability; @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) @InterfaceStability.Evolving public class RegionCoprocessorHost - extends CoprocessorHost { + extends CoprocessorHost { private static final Log LOG = LogFactory.getLog(RegionCoprocessorHost.class); // The shared data map @@ -103,10 +105,10 @@ public class RegionCoprocessorHost private final boolean hasCustomPostScannerFilterRow; /** - * + * * Encapsulation of the environment of each coprocessor */ - static class RegionEnvironment extends CoprocessorHost.Environment + static class RegionEnvironment extends BaseEnvironment implements RegionCoprocessorEnvironment { private Region region; @@ -119,7 +121,7 @@ public class RegionCoprocessorHost * @param impl the coprocessor instance * @param priority chaining priority */ - public RegionEnvironment(final Coprocessor impl, final int priority, + public RegionEnvironment(final RegionCoprocessor impl, final int priority, final int seq, final Configuration conf, final Region region, final RegionServerServices services, final ConcurrentMap sharedData) { super(impl, priority, seq, conf); @@ -142,6 +144,7 @@ public class RegionCoprocessorHost return rsServices; } + @Override public void shutdown() { super.shutdown(); MetricsCoprocessor.removeRegistry(this.metricRegistry); @@ -226,7 +229,7 @@ public class RegionCoprocessorHost // now check whether any coprocessor implements postScannerFilterRow boolean hasCustomPostScannerFilterRow = false; - out: for (RegionEnvironment env: coprocessors) { + out: for (RegionCoprocessorEnvironment env: coprocEnvironments) { if (env.getInstance() instanceof RegionObserver) { Class clazz = env.getInstance().getClass(); for(;;) { @@ -361,13 +364,16 @@ public class RegionCoprocessorHost // scan the table attributes for coprocessor load specifications // initialize the coprocessors - List configured = new ArrayList<>(); + List configured = new ArrayList<>(); for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf, region.getTableDescriptor())) { // Load encompasses classloading and coprocessor initialization try { - RegionEnvironment env = load(attr.getPath(), attr.getClassName(), attr.getPriority(), - attr.getConf()); + RegionCoprocessorEnvironment env = load(attr.getPath(), attr.getClassName(), + attr.getPriority(), attr.getConf()); + if (env == null) { + continue; + } configured.add(env); LOG.info("Loaded coprocessor " + attr.getClassName() + " from HTD of " + region.getTableDescriptor().getTableName().getNameAsString() + " successfully."); @@ -381,60 +387,101 @@ public class RegionCoprocessorHost } } // add together to coprocessor set for COW efficiency - coprocessors.addAll(configured); + coprocEnvironments.addAll(configured); } @Override - public RegionEnvironment createEnvironment(Class implClass, - Coprocessor instance, int priority, int seq, Configuration conf) { - // Check if it's an Endpoint. - // Due to current dynamic protocol design, Endpoint - // uses a different way to be registered and executed. - // It uses a visitor pattern to invoke registered Endpoint - // method. - for (Object itf : ClassUtils.getAllInterfaces(implClass)) { - Class c = (Class) itf; - if (CoprocessorService.class.isAssignableFrom(c)) { - region.registerService( ((CoprocessorService)instance).getService() ); - } - } + public RegionEnvironment createEnvironment(RegionCoprocessor instance, int priority, int seq, + Configuration conf) { + // Due to current dynamic protocol design, Endpoint uses a different way to be registered and + // executed. It uses a visitor pattern to invoke registered Endpoint method. + instance.getService().ifPresent(region::registerService); ConcurrentMap classData; // make sure only one thread can add maps synchronized (SHARED_DATA_MAP) { // as long as at least one RegionEnvironment holds on to its classData it will // remain in this map classData = - SHARED_DATA_MAP.computeIfAbsent(implClass.getName(), k -> new ConcurrentHashMap<>()); + SHARED_DATA_MAP.computeIfAbsent(instance.getClass().getName(), + k -> new ConcurrentHashMap<>()); } return new RegionEnvironment(instance, priority, seq, conf, region, rsServices, classData); } + @Override + public RegionCoprocessor checkAndGetInstance(Class implClass) + throws InstantiationException, IllegalAccessException { + if (RegionCoprocessor.class.isAssignableFrom(implClass)) { + return (RegionCoprocessor)implClass.newInstance(); + } else if (CoprocessorService.class.isAssignableFrom(implClass)) { + // For backward compatibility with old CoprocessorService impl which don't extend + // RegionCoprocessor. + return new CoprocessorServiceBackwardCompatiblity.RegionCoprocessorService( + (CoprocessorService)implClass.newInstance()); + } else { + LOG.error(implClass.getName() + " is not of type RegionCoprocessor. Check the " + + "configuration " + CoprocessorHost.REGION_COPROCESSOR_CONF_KEY); + return null; + } + } + + private ObserverGetter regionObserverGetter = + RegionCoprocessor::getRegionObserver; + + private ObserverGetter endpointObserverGetter = + RegionCoprocessor::getEndpointObserver; + + abstract class RegionObserverOperation extends ObserverOperationWithoutResult { + public RegionObserverOperation() { + super(regionObserverGetter); + } + + public RegionObserverOperation(User user) { + super(regionObserverGetter, user); + } + } + + abstract class BulkLoadObserverOperation extends + ObserverOperationWithoutResult { + public BulkLoadObserverOperation(User user) { + super(RegionCoprocessor::getBulkLoadObserver, user); + } + } + + + ////////////////////////////////////////////////////////////////////////////////////////////////// + // Observer operations + ////////////////////////////////////////////////////////////////////////////////////////////////// + + ////////////////////////////////////////////////////////////////////////////////////////////////// + // Observer operations + ////////////////////////////////////////////////////////////////////////////////////////////////// + /** * Invoked before a region open. * * @throws IOException Signals that an I/O exception has occurred. */ public void preOpen() throws IOException { - execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - oserver.preOpen(ctx); + public void call(RegionObserver observer) throws IOException { + observer.preOpen(this); } }); } + /** * Invoked after a region open */ public void postOpen() { try { - execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - oserver.postOpen(ctx); + public void call(RegionObserver observer) throws IOException { + observer.postOpen(this); } }); } catch (IOException e) { @@ -447,11 +494,10 @@ public class RegionCoprocessorHost */ public void postLogReplay() { try { - execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - oserver.postLogReplay(ctx); + public void call(RegionObserver observer) throws IOException { + observer.postLogReplay(this); } }); } catch (IOException e) { @@ -464,11 +510,10 @@ public class RegionCoprocessorHost * @param abortRequested true if the server is aborting */ public void preClose(final boolean abortRequested) throws IOException { - execOperation(false, new RegionOperation() { + execOperation(false, new RegionObserverOperation() { @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - oserver.preClose(ctx, abortRequested); + public void call(RegionObserver observer) throws IOException { + observer.preClose(this, abortRequested); } }); } @@ -479,14 +524,15 @@ public class RegionCoprocessorHost */ public void postClose(final boolean abortRequested) { try { - execOperation(false, new RegionOperation() { + execOperation(false, new RegionObserverOperation() { @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - oserver.postClose(ctx, abortRequested); + public void call(RegionObserver observer) throws IOException { + observer.postClose(this, abortRequested); } - public void postEnvCall(RegionEnvironment env) { - shutdown(env); + + @Override + public void postEnvCall() { + shutdown(this.getEnvironment()); } }); } catch (IOException e) { @@ -499,18 +545,19 @@ public class RegionCoprocessorHost * {@link RegionObserver#preCompactScannerOpen(ObserverContext, Store, List, ScanType, long, * InternalScanner, CompactionLifeCycleTracker, long)} */ - public InternalScanner preCompactScannerOpen(HStore store, List scanners, - ScanType scanType, long earliestPutTs, CompactionLifeCycleTracker tracker, User user, - long readPoint) throws IOException { - return execOperationWithResult(null, - coprocessors.isEmpty() ? null : new RegionOperationWithResult(user) { - @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - setResult(oserver.preCompactScannerOpen(ctx, store, scanners, scanType, - earliestPutTs, getResult(), tracker, readPoint)); - } - }); + public InternalScanner preCompactScannerOpen(final HStore store, + final List scanners, final ScanType scanType, final long earliestPutTs, + final CompactionLifeCycleTracker tracker, final User user, final long readPoint) + throws IOException { + return execOperationWithResult(null, coprocEnvironments.isEmpty() ? null : + new ObserverOperationWithResult( + regionObserverGetter, user) { + @Override + public InternalScanner call(RegionObserver observer) throws IOException { + return observer.preCompactScannerOpen(this, store, scanners, scanType, + earliestPutTs, getResult(), tracker, readPoint); + } + }); } /** @@ -522,13 +569,12 @@ public class RegionCoprocessorHost * @return If {@code true}, skip the normal selection process and use the current list * @throws IOException */ - public boolean preCompactSelection(HStore store, List candidates, - CompactionLifeCycleTracker tracker, User user) throws IOException { - return execOperation(coprocessors.isEmpty() ? null : new RegionOperation(user) { + public boolean preCompactSelection(final HStore store, final List candidates, + final CompactionLifeCycleTracker tracker, final User user) throws IOException { + return execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation(user) { @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - oserver.preCompactSelection(ctx, store, candidates, tracker); + public void call(RegionObserver observer) throws IOException { + observer.preCompactSelection(this, store, candidates, tracker); } }); } @@ -540,13 +586,12 @@ public class RegionCoprocessorHost * @param selected The store files selected to compact * @param tracker used to track the life cycle of a compaction */ - public void postCompactSelection(HStore store, ImmutableList selected, - CompactionLifeCycleTracker tracker, User user) throws IOException { - execOperation(coprocessors.isEmpty() ? null : new RegionOperation(user) { + public void postCompactSelection(final HStore store, final ImmutableList selected, + final CompactionLifeCycleTracker tracker, final User user) throws IOException { + execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation(user) { @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - oserver.postCompactSelection(ctx, store, selected, tracker); + public void call(RegionObserver observer) throws IOException { + observer.postCompactSelection(this, store, selected, tracker); } }); } @@ -559,16 +604,17 @@ public class RegionCoprocessorHost * @param tracker used to track the life cycle of a compaction * @throws IOException */ - public InternalScanner preCompact(HStore store, InternalScanner scanner, ScanType scanType, - CompactionLifeCycleTracker tracker, User user) throws IOException { - return execOperationWithResult(false, scanner, - coprocessors.isEmpty() ? null : new RegionOperationWithResult(user) { - @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - setResult(oserver.preCompact(ctx, store, getResult(), scanType, tracker)); - } - }); + public InternalScanner preCompact(final HStore store, final InternalScanner scanner, + final ScanType scanType, final CompactionLifeCycleTracker tracker, final User user) + throws IOException { + return execOperationWithResult(false, scanner, coprocEnvironments.isEmpty() ? null : + new ObserverOperationWithResult( + regionObserverGetter, user) { + @Override + public InternalScanner call(RegionObserver observer) throws IOException { + return observer.preCompact(this, store, getResult(), scanType, tracker); + } + }); } /** @@ -578,13 +624,12 @@ public class RegionCoprocessorHost * @param tracker used to track the life cycle of a compaction * @throws IOException */ - public void postCompact(HStore store, HStoreFile resultFile, CompactionLifeCycleTracker tracker, - User user) throws IOException { - execOperation(coprocessors.isEmpty() ? null : new RegionOperation(user) { + public void postCompact(final HStore store, final HStoreFile resultFile, + final CompactionLifeCycleTracker tracker, final User user) throws IOException { + execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation(user) { @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - oserver.postCompact(ctx, store, resultFile, tracker); + public void call(RegionObserver observer) throws IOException { + observer.postCompact(this, store, resultFile, tracker); } }); } @@ -595,14 +640,13 @@ public class RegionCoprocessorHost */ public InternalScanner preFlush(HStore store, final InternalScanner scanner) throws IOException { - return execOperationWithResult(false, scanner, - coprocessors.isEmpty() ? null : new RegionOperationWithResult() { - @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - setResult(oserver.preFlush(ctx, store, getResult())); - } - }); + return execOperationWithResult(false, scanner, coprocEnvironments.isEmpty() ? null : + new ObserverOperationWithResult(regionObserverGetter) { + @Override + public InternalScanner call(RegionObserver observer) throws IOException { + return observer.preFlush(this, store, getResult()); + } + }); } /** @@ -610,11 +654,10 @@ public class RegionCoprocessorHost * @throws IOException */ public void preFlush() throws IOException { - execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - oserver.preFlush(ctx); + public void call(RegionObserver observer) throws IOException { + observer.preFlush(this); } }); } @@ -623,16 +666,15 @@ public class RegionCoprocessorHost * See * {@link RegionObserver#preFlushScannerOpen(ObserverContext, Store, List, InternalScanner, long)} */ - public InternalScanner preFlushScannerOpen(HStore store, List scanners, - long readPoint) throws IOException { - return execOperationWithResult(null, - coprocessors.isEmpty() ? null : new RegionOperationWithResult() { - @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - setResult(oserver.preFlushScannerOpen(ctx, store, scanners, getResult(), readPoint)); - } - }); + public InternalScanner preFlushScannerOpen(final HStore store, + final List scanners, final long readPoint) throws IOException { + return execOperationWithResult(null, coprocEnvironments.isEmpty() ? null : + new ObserverOperationWithResult(regionObserverGetter) { + @Override + public InternalScanner call(RegionObserver observer) throws IOException { + return observer.preFlushScannerOpen(this, store, scanners, getResult(), readPoint); + } + }); } /** @@ -640,11 +682,10 @@ public class RegionCoprocessorHost * @throws IOException */ public void postFlush() throws IOException { - execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - oserver.postFlush(ctx); + public void call(RegionObserver observer) throws IOException { + observer.postFlush(this); } }); } @@ -653,12 +694,11 @@ public class RegionCoprocessorHost * Invoked after a memstore flush * @throws IOException */ - public void postFlush(HStore store, HStoreFile storeFile) throws IOException { - execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + public void postFlush(final HStore store, final HStoreFile storeFile) throws IOException { + execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - oserver.postFlush(ctx, store, storeFile); + public void call(RegionObserver observer) throws IOException { + observer.postFlush(this, store, storeFile); } }); } @@ -671,11 +711,10 @@ public class RegionCoprocessorHost */ public boolean preGet(final Get get, final List results) throws IOException { - return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + return execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - oserver.preGetOp(ctx, get, results); + public void call(RegionObserver observer) throws IOException { + observer.preGetOp(this, get, results); } }); } @@ -687,11 +726,10 @@ public class RegionCoprocessorHost */ public void postGet(final Get get, final List results) throws IOException { - execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - oserver.postGetOp(ctx, get, results); + public void call(RegionObserver observer) throws IOException { + observer.postGetOp(this, get, results); } }); } @@ -703,14 +741,13 @@ public class RegionCoprocessorHost * @exception IOException Exception */ public Boolean preExists(final Get get) throws IOException { - return execOperationWithResult(true, false, - coprocessors.isEmpty() ? null : new RegionOperationWithResult() { - @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - setResult(oserver.preExists(ctx, get, getResult())); - } - }); + return execOperationWithResult(true, false, coprocEnvironments.isEmpty() ? null : + new ObserverOperationWithResult(regionObserverGetter) { + @Override + public Boolean call(RegionObserver observer) throws IOException { + return observer.preExists(this, get, getResult()); + } + }); } /** @@ -721,14 +758,13 @@ public class RegionCoprocessorHost */ public boolean postExists(final Get get, boolean exists) throws IOException { - return execOperationWithResult(exists, - coprocessors.isEmpty() ? null : new RegionOperationWithResult() { - @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - setResult(oserver.postExists(ctx, get, getResult())); - } - }); + return execOperationWithResult(exists, coprocEnvironments.isEmpty() ? null : + new ObserverOperationWithResult(regionObserverGetter) { + @Override + public Boolean call(RegionObserver observer) throws IOException { + return observer.postExists(this, get, getResult()); + } + }); } /** @@ -740,11 +776,10 @@ public class RegionCoprocessorHost */ public boolean prePut(final Put put, final WALEdit edit, final Durability durability) throws IOException { - return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + return execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - oserver.prePut(ctx, put, edit, durability); + public void call(RegionObserver observer) throws IOException { + observer.prePut(this, put, edit, durability); } }); } @@ -761,11 +796,10 @@ public class RegionCoprocessorHost */ public boolean prePrepareTimeStampForDeleteVersion(final Mutation mutation, final Cell kv, final byte[] byteNow, final Get get) throws IOException { - return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + return execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - oserver.prePrepareTimeStampForDeleteVersion(ctx, mutation, kv, byteNow, get); + public void call(RegionObserver observer) throws IOException { + observer.prePrepareTimeStampForDeleteVersion(this, mutation, kv, byteNow, get); } }); } @@ -778,11 +812,10 @@ public class RegionCoprocessorHost */ public void postPut(final Put put, final WALEdit edit, final Durability durability) throws IOException { - execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - oserver.postPut(ctx, put, edit, durability); + public void call(RegionObserver observer) throws IOException { + observer.postPut(this, put, edit, durability); } }); } @@ -796,11 +829,10 @@ public class RegionCoprocessorHost */ public boolean preDelete(final Delete delete, final WALEdit edit, final Durability durability) throws IOException { - return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + return execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - oserver.preDelete(ctx, delete, edit, durability); + public void call(RegionObserver observer) throws IOException { + observer.preDelete(this, delete, edit, durability); } }); } @@ -813,11 +845,10 @@ public class RegionCoprocessorHost */ public void postDelete(final Delete delete, final WALEdit edit, final Durability durability) throws IOException { - execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - oserver.postDelete(ctx, delete, edit, durability); + public void call(RegionObserver observer) throws IOException { + observer.postDelete(this, delete, edit, durability); } }); } @@ -829,11 +860,10 @@ public class RegionCoprocessorHost */ public boolean preBatchMutate( final MiniBatchOperationInProgress miniBatchOp) throws IOException { - return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + return execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - oserver.preBatchMutate(ctx, miniBatchOp); + public void call(RegionObserver observer) throws IOException { + observer.preBatchMutate(this, miniBatchOp); } }); } @@ -844,11 +874,10 @@ public class RegionCoprocessorHost */ public void postBatchMutate( final MiniBatchOperationInProgress miniBatchOp) throws IOException { - execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - oserver.postBatchMutate(ctx, miniBatchOp); + public void call(RegionObserver observer) throws IOException { + observer.postBatchMutate(this, miniBatchOp); } }); } @@ -856,11 +885,10 @@ public class RegionCoprocessorHost public void postBatchMutateIndispensably( final MiniBatchOperationInProgress miniBatchOp, final boolean success) throws IOException { - execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - oserver.postBatchMutateIndispensably(ctx, miniBatchOp, success); + public void call(RegionObserver observer) throws IOException { + observer.postBatchMutateIndispensably(this, miniBatchOp, success); } }); } @@ -880,15 +908,14 @@ public class RegionCoprocessorHost final byte [] qualifier, final CompareOperator op, final ByteArrayComparable comparator, final Put put) throws IOException { - return execOperationWithResult(true, false, - coprocessors.isEmpty() ? null : new RegionOperationWithResult() { - @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - setResult(oserver.preCheckAndPut(ctx, row, family, qualifier, - op, comparator, put, getResult())); - } - }); + return execOperationWithResult(true, false, coprocEnvironments.isEmpty() ? null : + new ObserverOperationWithResult(regionObserverGetter) { + @Override + public Boolean call(RegionObserver observer) throws IOException { + return observer.preCheckAndPut(this, row, family, qualifier, + op, comparator, put, getResult()); + } + }); } /** @@ -902,18 +929,17 @@ public class RegionCoprocessorHost * be bypassed, or null otherwise * @throws IOException e */ - public Boolean preCheckAndPutAfterRowLock(final byte[] row, final byte[] family, - final byte[] qualifier, final CompareOperator op, final ByteArrayComparable comparator, - final Put put) throws IOException { - return execOperationWithResult(true, false, - coprocessors.isEmpty() ? null : new RegionOperationWithResult() { - @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - setResult(oserver.preCheckAndPutAfterRowLock(ctx, row, family, qualifier, - op, comparator, put, getResult())); - } - }); + public Boolean preCheckAndPutAfterRowLock( + final byte[] row, final byte[] family, final byte[] qualifier, final CompareOperator op, + final ByteArrayComparable comparator, final Put put) throws IOException { + return execOperationWithResult(true, false, coprocEnvironments.isEmpty() ? null : + new ObserverOperationWithResult(regionObserverGetter) { + @Override + public Boolean call(RegionObserver observer) throws IOException { + return observer.preCheckAndPutAfterRowLock(this, row, family, qualifier, + op, comparator, put, getResult()); + } + }); } /** @@ -929,15 +955,14 @@ public class RegionCoprocessorHost final byte [] qualifier, final CompareOperator op, final ByteArrayComparable comparator, final Put put, boolean result) throws IOException { - return execOperationWithResult(result, - coprocessors.isEmpty() ? null : new RegionOperationWithResult() { - @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - setResult(oserver.postCheckAndPut(ctx, row, family, qualifier, - op, comparator, put, getResult())); - } - }); + return execOperationWithResult(result, coprocEnvironments.isEmpty() ? null : + new ObserverOperationWithResult(regionObserverGetter) { + @Override + public Boolean call(RegionObserver observer) throws IOException { + return observer.postCheckAndPut(this, row, family, qualifier, + op, comparator, put, getResult()); + } + }); } /** @@ -955,15 +980,14 @@ public class RegionCoprocessorHost final byte [] qualifier, final CompareOperator op, final ByteArrayComparable comparator, final Delete delete) throws IOException { - return execOperationWithResult(true, false, - coprocessors.isEmpty() ? null : new RegionOperationWithResult() { - @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - setResult(oserver.preCheckAndDelete(ctx, row, family, - qualifier, op, comparator, delete, getResult())); - } - }); + return execOperationWithResult(true, false, coprocEnvironments.isEmpty() ? null : + new ObserverOperationWithResult(regionObserverGetter) { + @Override + public Boolean call(RegionObserver observer) throws IOException { + return observer.preCheckAndDelete(this, row, family, + qualifier, op, comparator, delete, getResult()); + } + }); } /** @@ -978,17 +1002,16 @@ public class RegionCoprocessorHost * @throws IOException e */ public Boolean preCheckAndDeleteAfterRowLock(final byte[] row, final byte[] family, - final byte[] qualifier, final CompareOperator op, final ByteArrayComparable comparator, - final Delete delete) throws IOException { - return execOperationWithResult(true, false, - coprocessors.isEmpty() ? null : new RegionOperationWithResult() { - @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - setResult(oserver.preCheckAndDeleteAfterRowLock(ctx, row, - family, qualifier, op, comparator, delete, getResult())); - } - }); + final byte[] qualifier, final CompareOperator op, final ByteArrayComparable comparator, + final Delete delete) throws IOException { + return execOperationWithResult(true, false, coprocEnvironments.isEmpty() ? null : + new ObserverOperationWithResult(regionObserverGetter) { + @Override + public Boolean call(RegionObserver observer) throws IOException { + return observer.preCheckAndDeleteAfterRowLock(this, row, + family, qualifier, op, comparator, delete, getResult()); + } + }); } /** @@ -1004,15 +1027,14 @@ public class RegionCoprocessorHost final byte [] qualifier, final CompareOperator op, final ByteArrayComparable comparator, final Delete delete, boolean result) throws IOException { - return execOperationWithResult(result, - coprocessors.isEmpty() ? null : new RegionOperationWithResult() { - @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - setResult(oserver.postCheckAndDelete(ctx, row, family, - qualifier, op, comparator, delete, getResult())); - } - }); + return execOperationWithResult(result, coprocEnvironments.isEmpty() ? null : + new ObserverOperationWithResult(regionObserverGetter) { + @Override + public Boolean call(RegionObserver observer) throws IOException { + return observer.postCheckAndDelete(this, row, family, + qualifier, op, comparator, delete, getResult()); + } + }); } /** @@ -1022,14 +1044,13 @@ public class RegionCoprocessorHost * @throws IOException if an error occurred on the coprocessor */ public Result preAppend(final Append append) throws IOException { - return execOperationWithResult(true, null, - coprocessors.isEmpty() ? null : new RegionOperationWithResult() { - @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - setResult(oserver.preAppend(ctx, append)); - } - }); + return execOperationWithResult(true, null, coprocEnvironments.isEmpty() ? null : + new ObserverOperationWithResult(regionObserverGetter) { + @Override + public Result call(RegionObserver observer) throws IOException { + return observer.preAppend(this, append); + } + }); } /** @@ -1039,14 +1060,13 @@ public class RegionCoprocessorHost * @throws IOException if an error occurred on the coprocessor */ public Result preAppendAfterRowLock(final Append append) throws IOException { - return execOperationWithResult(true, null, - coprocessors.isEmpty() ? null : new RegionOperationWithResult() { - @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - setResult(oserver.preAppendAfterRowLock(ctx, append)); - } - }); + return execOperationWithResult(true, null, coprocEnvironments.isEmpty() ? null : + new ObserverOperationWithResult(regionObserverGetter) { + @Override + public Result call(RegionObserver observer) throws IOException { + return observer.preAppendAfterRowLock(this, append); + } + }); } /** @@ -1056,14 +1076,13 @@ public class RegionCoprocessorHost * @throws IOException if an error occurred on the coprocessor */ public Result preIncrement(final Increment increment) throws IOException { - return execOperationWithResult(true, null, - coprocessors.isEmpty() ? null : new RegionOperationWithResult() { - @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - setResult(oserver.preIncrement(ctx, increment)); - } - }); + return execOperationWithResult(true, null, coprocEnvironments.isEmpty() ? null : + new ObserverOperationWithResult(regionObserverGetter) { + @Override + public Result call(RegionObserver observer) throws IOException { + return observer.preIncrement(this, increment); + } + }); } /** @@ -1073,14 +1092,13 @@ public class RegionCoprocessorHost * @throws IOException if an error occurred on the coprocessor */ public Result preIncrementAfterRowLock(final Increment increment) throws IOException { - return execOperationWithResult(true, null, - coprocessors.isEmpty() ? null : new RegionOperationWithResult() { - @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - setResult(oserver.preIncrementAfterRowLock(ctx, increment)); - } - }); + return execOperationWithResult(true, null, coprocEnvironments.isEmpty() ? null : + new ObserverOperationWithResult(regionObserverGetter) { + @Override + public Result call(RegionObserver observer) throws IOException { + return observer.preIncrementAfterRowLock(this, increment); + } + }); } /** @@ -1089,14 +1107,13 @@ public class RegionCoprocessorHost * @throws IOException if an error occurred on the coprocessor */ public Result postAppend(final Append append, final Result result) throws IOException { - return execOperationWithResult(result, - coprocessors.isEmpty() ? null : new RegionOperationWithResult() { - @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - setResult(oserver.postAppend(ctx, append, result)); - } - }); + return execOperationWithResult(result, coprocEnvironments.isEmpty() ? null : + new ObserverOperationWithResult(regionObserverGetter) { + @Override + public Result call(RegionObserver observer) throws IOException { + return observer.postAppend(this, append, result); + } + }); } /** @@ -1105,14 +1122,13 @@ public class RegionCoprocessorHost * @throws IOException if an error occurred on the coprocessor */ public Result postIncrement(final Increment increment, Result result) throws IOException { - return execOperationWithResult(result, - coprocessors.isEmpty() ? null : new RegionOperationWithResult() { - @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - setResult(oserver.postIncrement(ctx, increment, getResult())); - } - }); + return execOperationWithResult(result, coprocEnvironments.isEmpty() ? null : + new ObserverOperationWithResult(regionObserverGetter) { + @Override + public Result call(RegionObserver observer) throws IOException { + return observer.postIncrement(this, increment, getResult()); + } + }); } /** @@ -1122,30 +1138,28 @@ public class RegionCoprocessorHost * @exception IOException Exception */ public RegionScanner preScannerOpen(final Scan scan) throws IOException { - return execOperationWithResult(true, null, - coprocessors.isEmpty() ? null : new RegionOperationWithResult() { - @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - setResult(oserver.preScannerOpen(ctx, scan, getResult())); - } - }); + return execOperationWithResult(true, null, coprocEnvironments.isEmpty() ? null : + new ObserverOperationWithResult(regionObserverGetter) { + @Override + public RegionScanner call(RegionObserver observer) throws IOException { + return observer.preScannerOpen(this, scan, getResult()); + } + }); } /** * See * {@link RegionObserver#preStoreScannerOpen(ObserverContext, Store, Scan, NavigableSet, KeyValueScanner, long)} */ - public KeyValueScanner preStoreScannerOpen(HStore store, Scan scan, - NavigableSet targetCols, long readPt) throws IOException { - return execOperationWithResult(null, - coprocessors.isEmpty() ? null : new RegionOperationWithResult() { - @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - setResult(oserver.preStoreScannerOpen(ctx, store, scan, targetCols, getResult(), readPt)); - } - }); + public KeyValueScanner preStoreScannerOpen(final HStore store, final Scan scan, + final NavigableSet targetCols, final long readPt) throws IOException { + return execOperationWithResult(null, coprocEnvironments.isEmpty() ? null : + new ObserverOperationWithResult(regionObserverGetter) { + @Override + public KeyValueScanner call(RegionObserver observer) throws IOException { + return observer.preStoreScannerOpen(this, store, scan, targetCols, getResult(), readPt); + } + }); } /** @@ -1155,14 +1169,13 @@ public class RegionCoprocessorHost * @exception IOException Exception */ public RegionScanner postScannerOpen(final Scan scan, RegionScanner s) throws IOException { - return execOperationWithResult(s, - coprocessors.isEmpty() ? null : new RegionOperationWithResult() { - @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - setResult(oserver.postScannerOpen(ctx, scan, getResult())); - } - }); + return execOperationWithResult(s, coprocEnvironments.isEmpty() ? null : + new ObserverOperationWithResult(regionObserverGetter) { + @Override + public RegionScanner call(RegionObserver observer) throws IOException { + return observer.postScannerOpen(this, scan, getResult()); + } + }); } /** @@ -1175,14 +1188,13 @@ public class RegionCoprocessorHost */ public Boolean preScannerNext(final InternalScanner s, final List results, final int limit) throws IOException { - return execOperationWithResult(true, false, - coprocessors.isEmpty() ? null : new RegionOperationWithResult() { - @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - setResult(oserver.preScannerNext(ctx, s, results, limit, getResult())); - } - }); + return execOperationWithResult(true, false, coprocEnvironments.isEmpty() ? null : + new ObserverOperationWithResult(regionObserverGetter) { + @Override + public Boolean call(RegionObserver observer) throws IOException { + return observer.preScannerNext(this, s, results, limit, getResult()); + } + }); } /** @@ -1196,14 +1208,13 @@ public class RegionCoprocessorHost public boolean postScannerNext(final InternalScanner s, final List results, final int limit, boolean hasMore) throws IOException { - return execOperationWithResult(hasMore, - coprocessors.isEmpty() ? null : new RegionOperationWithResult() { - @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - setResult(oserver.postScannerNext(ctx, s, results, limit, getResult())); - } - }); + return execOperationWithResult(hasMore, coprocEnvironments.isEmpty() ? null : + new ObserverOperationWithResult(regionObserverGetter) { + @Override + public Boolean call(RegionObserver observer) throws IOException { + return observer.postScannerNext(this, s, results, limit, getResult()); + } + }); } /** @@ -1218,14 +1229,13 @@ public class RegionCoprocessorHost throws IOException { // short circuit for performance if (!hasCustomPostScannerFilterRow) return true; - return execOperationWithResult(true, - coprocessors.isEmpty() ? null : new RegionOperationWithResult() { - @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - setResult(oserver.postScannerFilterRow(ctx, s, curRowCell, getResult())); - } - }); + return execOperationWithResult(true, coprocEnvironments.isEmpty() ? null : + new ObserverOperationWithResult(regionObserverGetter) { + @Override + public Boolean call(RegionObserver observer) throws IOException { + return observer.postScannerFilterRow(this, s, curRowCell, getResult()); + } + }); } /** @@ -1234,11 +1244,10 @@ public class RegionCoprocessorHost * @exception IOException Exception */ public boolean preScannerClose(final InternalScanner s) throws IOException { - return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + return execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - oserver.preScannerClose(ctx, s); + public void call(RegionObserver observer) throws IOException { + observer.preScannerClose(this, s); } }); } @@ -1247,11 +1256,10 @@ public class RegionCoprocessorHost * @exception IOException Exception */ public void postScannerClose(final InternalScanner s) throws IOException { - execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - oserver.postScannerClose(ctx, s); + public void call(RegionObserver observer) throws IOException { + observer.postScannerClose(this, s); } }); } @@ -1262,11 +1270,10 @@ public class RegionCoprocessorHost * @throws IOException Exception */ public void preReplayWALs(final RegionInfo info, final Path edits) throws IOException { - execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - oserver.preReplayWALs(ctx, info, edits); + public void call(RegionObserver observer) throws IOException { + observer.preReplayWALs(this, info, edits); } }); } @@ -1277,11 +1284,10 @@ public class RegionCoprocessorHost * @throws IOException Exception */ public void postReplayWALs(final RegionInfo info, final Path edits) throws IOException { - execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - oserver.postReplayWALs(ctx, info, edits); + public void call(RegionObserver observer) throws IOException { + observer.postReplayWALs(this, info, edits); } }); } @@ -1295,11 +1301,10 @@ public class RegionCoprocessorHost */ public boolean preWALRestore(final RegionInfo info, final WALKey logKey, final WALEdit logEdit) throws IOException { - return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + return execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - oserver.preWALRestore(ctx, info, logKey, logEdit); + public void call(RegionObserver observer) throws IOException { + observer.preWALRestore(this, info, logKey, logEdit); } }); } @@ -1312,11 +1317,10 @@ public class RegionCoprocessorHost */ public void postWALRestore(final RegionInfo info, final WALKey logKey, final WALEdit logEdit) throws IOException { - execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - oserver.postWALRestore(ctx, info, logKey, logEdit); + public void call(RegionObserver observer) throws IOException { + observer.postWALRestore(this, info, logKey, logEdit); } }); } @@ -1327,31 +1331,28 @@ public class RegionCoprocessorHost * @throws IOException */ public boolean preBulkLoadHFile(final List> familyPaths) throws IOException { - return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + return execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - oserver.preBulkLoadHFile(ctx, familyPaths); + public void call(RegionObserver observer) throws IOException { + observer.preBulkLoadHFile(this, familyPaths); } }); } public boolean preCommitStoreFile(final byte[] family, final List> pairs) throws IOException { - return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + return execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - oserver.preCommitStoreFile(ctx, family, pairs); + public void call(RegionObserver observer) throws IOException { + observer.preCommitStoreFile(this, family, pairs); } }); } public void postCommitStoreFile(final byte[] family, Path srcPath, Path dstPath) throws IOException { - execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - oserver.postCommitStoreFile(ctx, family, srcPath, dstPath); + public void call(RegionObserver observer) throws IOException { + observer.postCommitStoreFile(this, family, srcPath, dstPath); } }); } @@ -1365,32 +1366,29 @@ public class RegionCoprocessorHost */ public boolean postBulkLoadHFile(final List> familyPaths, Map> map, boolean hasLoaded) throws IOException { - return execOperationWithResult(hasLoaded, - coprocessors.isEmpty() ? null : new RegionOperationWithResult() { - @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - setResult(oserver.postBulkLoadHFile(ctx, familyPaths, map, getResult())); - } - }); + return execOperationWithResult(hasLoaded, coprocEnvironments.isEmpty() ? null : + new ObserverOperationWithResult(regionObserverGetter) { + @Override + public Boolean call(RegionObserver observer) throws IOException { + return observer.postBulkLoadHFile(this, familyPaths, map, getResult()); + } + }); } public void postStartRegionOperation(final Operation op) throws IOException { - execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - oserver.postStartRegionOperation(ctx, op); + public void call(RegionObserver observer) throws IOException { + observer.postStartRegionOperation(this, op); } }); } public void postCloseRegionOperation(final Operation op) throws IOException { - execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - oserver.postCloseRegionOperation(ctx, op); + public void call(RegionObserver observer) throws IOException { + observer.postCloseRegionOperation(this, op); } }); } @@ -1409,14 +1407,14 @@ public class RegionCoprocessorHost public StoreFileReader preStoreFileReaderOpen(final FileSystem fs, final Path p, final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf, final Reference r) throws IOException { - return execOperationWithResult(null, - coprocessors.isEmpty() ? null : new RegionOperationWithResult() { - @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - setResult(oserver.preStoreFileReaderOpen(ctx, fs, p, in, size, cacheConf, r, getResult())); - } - }); + return execOperationWithResult(null, coprocEnvironments.isEmpty() ? null : + new ObserverOperationWithResult(regionObserverGetter) { + @Override + public StoreFileReader call(RegionObserver observer) throws IOException { + return observer.preStoreFileReaderOpen(this, fs, p, in, size, cacheConf, r, + getResult()); + } + }); } /** @@ -1433,192 +1431,77 @@ public class RegionCoprocessorHost public StoreFileReader postStoreFileReaderOpen(final FileSystem fs, final Path p, final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf, final Reference r, final StoreFileReader reader) throws IOException { - return execOperationWithResult(reader, - coprocessors.isEmpty() ? null : new RegionOperationWithResult() { - @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - setResult(oserver.postStoreFileReaderOpen(ctx, fs, p, in, size, cacheConf, r, getResult())); - } - }); + return execOperationWithResult(reader, coprocEnvironments.isEmpty() ? null : + new ObserverOperationWithResult(regionObserverGetter) { + @Override + public StoreFileReader call(RegionObserver observer) throws IOException { + return observer.postStoreFileReaderOpen(this, fs, p, in, size, cacheConf, r, + getResult()); + } + }); } public Cell postMutationBeforeWAL(final MutationType opType, final Mutation mutation, final Cell oldCell, Cell newCell) throws IOException { - return execOperationWithResult(newCell, - coprocessors.isEmpty() ? null : new RegionOperationWithResult() { - @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - setResult(oserver.postMutationBeforeWAL(ctx, opType, mutation, oldCell, getResult())); - } - }); + return execOperationWithResult(newCell, coprocEnvironments.isEmpty() ? null : + new ObserverOperationWithResult(regionObserverGetter) { + @Override + public Cell call(RegionObserver observer) throws IOException { + return observer.postMutationBeforeWAL(this, opType, mutation, oldCell, getResult()); + } + }); } public Message preEndpointInvocation(final Service service, final String methodName, Message request) throws IOException { - return execOperationWithResult(request, - coprocessors.isEmpty() ? null : new EndpointOperationWithResult() { - @Override - public void call(EndpointObserver oserver, ObserverContext ctx) - throws IOException { - setResult(oserver.preEndpointInvocation(ctx, service, methodName, getResult())); - } - }); + return execOperationWithResult(request, coprocEnvironments.isEmpty() ? null : + new ObserverOperationWithResult(endpointObserverGetter) { + @Override + public Message call(EndpointObserver observer) throws IOException { + return observer.preEndpointInvocation(this, service, methodName, getResult()); + } + }); } public void postEndpointInvocation(final Service service, final String methodName, final Message request, final Message.Builder responseBuilder) throws IOException { - execOperation(coprocessors.isEmpty() ? null : new EndpointOperation() { - @Override - public void call(EndpointObserver oserver, ObserverContext ctx) - throws IOException { - oserver.postEndpointInvocation(ctx, service, methodName, request, responseBuilder); - } - }); + execOperation(coprocEnvironments.isEmpty() ? null : + new ObserverOperationWithoutResult(endpointObserverGetter) { + @Override + public void call(EndpointObserver observer) throws IOException { + observer.postEndpointInvocation(this, service, methodName, request, responseBuilder); + } + }); } public DeleteTracker postInstantiateDeleteTracker(DeleteTracker tracker) throws IOException { - return execOperationWithResult(tracker, - coprocessors.isEmpty() ? null : new RegionOperationWithResult() { - @Override - public void call(RegionObserver oserver, ObserverContext ctx) - throws IOException { - setResult(oserver.postInstantiateDeleteTracker(ctx, getResult())); - } - }); - } - - private static abstract class CoprocessorOperation - extends ObserverContext { - public CoprocessorOperation() { - this(RpcServer.getRequestUser()); - } - - public CoprocessorOperation(User user) { - super(user); - } - - public abstract void call(Coprocessor observer, - ObserverContext ctx) throws IOException; - public abstract boolean hasCall(Coprocessor observer); - public void postEnvCall(RegionEnvironment env) { } - } - - private static abstract class RegionOperation extends CoprocessorOperation { - public RegionOperation() { - } - - public RegionOperation(User user) { - super(user); - } - - public abstract void call(RegionObserver observer, - ObserverContext ctx) throws IOException; - - public boolean hasCall(Coprocessor observer) { - return observer instanceof RegionObserver; - } - - public void call(Coprocessor observer, ObserverContext ctx) - throws IOException { - call((RegionObserver)observer, ctx); - } - } - - private static abstract class RegionOperationWithResult extends RegionOperation { - public RegionOperationWithResult() { - } - - public RegionOperationWithResult(User user) { - super (user); - } - - private T result = null; - public void setResult(final T result) { this.result = result; } - public T getResult() { return this.result; } - } - - private static abstract class EndpointOperation extends CoprocessorOperation { - public abstract void call(EndpointObserver observer, - ObserverContext ctx) throws IOException; - - public boolean hasCall(Coprocessor observer) { - return observer instanceof EndpointObserver; - } - - public void call(Coprocessor observer, ObserverContext ctx) - throws IOException { - call((EndpointObserver)observer, ctx); - } - } - - private static abstract class EndpointOperationWithResult extends EndpointOperation { - private T result = null; - public void setResult(final T result) { this.result = result; } - public T getResult() { return this.result; } - } - - private boolean execOperation(final CoprocessorOperation ctx) - throws IOException { - return execOperation(true, ctx); - } - - private T execOperationWithResult(final T defaultValue, - final RegionOperationWithResult ctx) throws IOException { - if (ctx == null) return defaultValue; - ctx.setResult(defaultValue); - execOperation(true, ctx); - return ctx.getResult(); - } - - private T execOperationWithResult(final boolean ifBypass, final T defaultValue, - final RegionOperationWithResult ctx) throws IOException { - boolean bypass = false; - T result = defaultValue; - if (ctx != null) { - ctx.setResult(defaultValue); - bypass = execOperation(true, ctx); - result = ctx.getResult(); - } - return bypass == ifBypass ? result : null; + return execOperationWithResult(tracker, coprocEnvironments.isEmpty() ? null : + new ObserverOperationWithResult(regionObserverGetter) { + @Override + public DeleteTracker call(RegionObserver observer) throws IOException { + return observer.postInstantiateDeleteTracker(this, getResult()); + } + }); } - private T execOperationWithResult(final T defaultValue, - final EndpointOperationWithResult ctx) throws IOException { - if (ctx == null) return defaultValue; - ctx.setResult(defaultValue); - execOperation(true, ctx); - return ctx.getResult(); + ///////////////////////////////////////////////////////////////////////////////////////////////// + // BulkLoadObserver hooks + ///////////////////////////////////////////////////////////////////////////////////////////////// + public void prePrepareBulkLoad(User user) throws IOException { + execOperation(coprocEnvironments.isEmpty() ? null : + new BulkLoadObserverOperation(user) { + @Override protected void call(BulkLoadObserver observer) throws IOException { + observer.prePrepareBulkLoad(this); + } + }); } - private boolean execOperation(final boolean earlyExit, final CoprocessorOperation ctx) - throws IOException { - boolean bypass = false; - List envs = coprocessors.get(); - for (int i = 0; i < envs.size(); i++) { - RegionEnvironment env = envs.get(i); - Coprocessor observer = env.getInstance(); - if (ctx.hasCall(observer)) { - ctx.prepare(env); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ctx.call(observer, ctx); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - if (earlyExit && ctx.shouldComplete()) { - break; - } - } - - ctx.postEnvCall(env); - } - return bypass; + public void preCleanupBulkLoad(User user) throws IOException { + execOperation(coprocEnvironments.isEmpty() ? null : + new BulkLoadObserverOperation(user) { + @Override protected void call(BulkLoadObserver observer) throws IOException { + observer.preCleanupBulkLoad(this); + } + }); } }