Return-Path: Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: (qmail 77768 invoked from network); 15 Dec 2010 10:12:27 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 15 Dec 2010 10:12:27 -0000 Received: (qmail 24269 invoked by uid 500); 15 Dec 2010 10:12:27 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 23949 invoked by uid 500); 15 Dec 2010 10:12:25 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 23935 invoked by uid 99); 15 Dec 2010 10:12:24 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 15 Dec 2010 10:12:24 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 15 Dec 2010 10:12:20 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 3606223888A6; Wed, 15 Dec 2010 10:12:00 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1049471 - in /hbase/trunk: ./ src/main/java/org/apache/hadoop/hbase/coprocessor/ src/main/java/org/apache/hadoop/hbase/regionserver/ src/test/java/org/apache/hadoop/hbase/coprocessor/ Date: Wed, 15 Dec 2010 10:12:00 -0000 To: commits@hbase.apache.org From: apurtell@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20101215101200.3606223888A6@eris.apache.org> Author: apurtell Date: Wed Dec 15 10:11:59 2010 New Revision: 1049471 URL: http://svn.apache.org/viewvc?rev=1049471&view=rev Log: HBASE-3348 Coprocessors: Allow Observers to completely override base function Modified: hbase/trunk/CHANGES.txt hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserverCoprocessor.java hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorEnvironment.java hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CoprocessorHost.java hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java Modified: hbase/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1049471&r1=1049470&r2=1049471&view=diff ============================================================================== --- hbase/trunk/CHANGES.txt (original) +++ hbase/trunk/CHANGES.txt Wed Dec 15 10:11:59 2010 @@ -36,6 +36,8 @@ Release 0.91.0 - Unreleased HBASE-1861 Multi-Family support for bulk upload tools HBASE-3308 SplitTransaction.splitStoreFiles slows splits a lot HBASE-3328 Added Admin API to specify explicit split points + HBASE-3345 Coprocessors: Allow observers to completely override base + function NEW FEATURES Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserverCoprocessor.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserverCoprocessor.java?rev=1049471&r1=1049470&r2=1049471&view=diff ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserverCoprocessor.java (original) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserverCoprocessor.java Wed Dec 15 10:11:59 2010 @@ -27,6 +27,8 @@ import org.apache.hadoop.hbase.client.Pu import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.InternalScanner; + import java.io.IOException; /** @@ -71,70 +73,65 @@ public abstract class BaseRegionObserver @Override public void preGetClosestRowBefore(final CoprocessorEnvironment e, - final byte [] row, final byte [] family) + final byte [] row, final byte [] family, final Result result) throws IOException { } @Override - public Result postGetClosestRowBefore(final CoprocessorEnvironment e, - byte[] row, byte[] family, final Result result) - throws IOException { - return result; + public void postGetClosestRowBefore(final CoprocessorEnvironment e, + final byte [] row, final byte [] family, final Result result) + throws IOException { } @Override - public Get preGet(final CoprocessorEnvironment e, final Get get) - throws IOException { - return get; + public void preGet(final CoprocessorEnvironment e, final Get get, + final List results) throws IOException { } @Override - public List postGet(final CoprocessorEnvironment e, final Get get, - List results) throws IOException { - return results; + public void postGet(final CoprocessorEnvironment e, final Get get, + final List results) throws IOException { } @Override - public Get preExists(final CoprocessorEnvironment e, final Get get) - throws IOException { - return get; + public boolean preExists(final CoprocessorEnvironment e, final Get get, + final boolean exists) throws IOException { + return exists; } @Override - public boolean postExists(final CoprocessorEnvironment e, - final Get get, boolean exists) - throws IOException { + public boolean postExists(final CoprocessorEnvironment e, final Get get, + boolean exists) throws IOException { return exists; } @Override - public Map> prePut(final CoprocessorEnvironment e, - final Map> familyMap) throws IOException { - return familyMap; + public void prePut(final CoprocessorEnvironment e, final Map> familyMap, final boolean writeToWAL) throws IOException { } @Override - public void postPut(final CoprocessorEnvironment e, - final Map> familyMap) - throws IOException { + public void postPut(final CoprocessorEnvironment e, final Map> familyMap, final boolean writeToWAL) throws IOException { } @Override - public Map> preDelete(final CoprocessorEnvironment e, - final Map> familyMap) throws IOException { - return familyMap; + public void preDelete(final CoprocessorEnvironment e, final Map> familyMap, final boolean writeToWAL) throws IOException { } @Override - public void postDelete(CoprocessorEnvironment e, - Map> familyMap) throws IOException { + public void postDelete(final CoprocessorEnvironment e, + final Map> familyMap, final boolean writeToWAL) + throws IOException { } @Override - public Put preCheckAndPut(final CoprocessorEnvironment e, + public boolean preCheckAndPut(final CoprocessorEnvironment e, final byte [] row, final byte [] family, final byte [] qualifier, - final byte [] value, final Put put) throws IOException { - return put; + final byte [] value, final Put put, final boolean result) + throws IOException { + return result; } @Override @@ -146,18 +143,18 @@ public abstract class BaseRegionObserver } @Override - public Delete preCheckAndDelete(final CoprocessorEnvironment e, + public boolean preCheckAndDelete(final CoprocessorEnvironment e, final byte [] row, final byte [] family, final byte [] qualifier, - final byte [] value, final Delete delete) - throws IOException { - return delete; + final byte [] value, final Delete delete, final boolean result) + throws IOException { + return result; } @Override public boolean postCheckAndDelete(final CoprocessorEnvironment e, final byte [] row, final byte [] family, final byte [] qualifier, final byte [] value, final Delete delete, final boolean result) - throws IOException { + throws IOException { return result; } @@ -172,54 +169,53 @@ public abstract class BaseRegionObserver public long postIncrementColumnValue(final CoprocessorEnvironment e, final byte [] row, final byte [] family, final byte [] qualifier, final long amount, final boolean writeToWAL, long result) - throws IOException { + throws IOException { return result; } @Override - public Increment preIncrement(final CoprocessorEnvironment e, - final Increment increment) - throws IOException { - return increment; + public void preIncrement(final CoprocessorEnvironment e, + final Increment increment, final Result result) throws IOException { } @Override - public Result postIncrement(final CoprocessorEnvironment e, - final Increment increment, - final Result result) throws IOException { - return result; + public void postIncrement(final CoprocessorEnvironment e, + final Increment increment, final Result result) throws IOException { } @Override - public Scan preScannerOpen(final CoprocessorEnvironment e, final Scan scan) - throws IOException { - return scan; + public InternalScanner preScannerOpen(final CoprocessorEnvironment e, + final Scan scan, final InternalScanner s) throws IOException { + return s; } @Override - public void postScannerOpen(final CoprocessorEnvironment e, - final Scan scan, - final long scannerId) throws IOException { } + public InternalScanner postScannerOpen(final CoprocessorEnvironment e, + final Scan scan, final InternalScanner s) throws IOException { + return s; + } @Override - public void preScannerNext(final CoprocessorEnvironment e, - final long scannerId) throws IOException { + public boolean preScannerNext(final CoprocessorEnvironment e, + final InternalScanner s, final List results, + final int limit, final boolean hasMore) throws IOException { + return hasMore; } @Override - public List postScannerNext(final CoprocessorEnvironment e, - final long scannerId, final List results) - throws IOException { - return results; + public boolean postScannerNext(final CoprocessorEnvironment e, + final InternalScanner s, final List results, final int limit, + final boolean hasMore) throws IOException { + return hasMore; } @Override public void preScannerClose(final CoprocessorEnvironment e, - final long scannerId) - throws IOException { } + final InternalScanner s) throws IOException { + } @Override public void postScannerClose(final CoprocessorEnvironment e, - final long scannerId) - throws IOException { } + final InternalScanner s) throws IOException { + } } Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorEnvironment.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorEnvironment.java?rev=1049471&r1=1049470&r2=1049471&view=diff ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorEnvironment.java (original) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorEnvironment.java Wed Dec 15 10:11:59 2010 @@ -45,27 +45,17 @@ public interface CoprocessorEnvironment */ public HTableInterface getTable(byte[] tableName) throws IOException; - // environment variables + /* Control flow changes */ /** - * Get an environment variable - * @param key the key - * @return the object corresponding to the environment variable, if set + * Causes framework to bypass default actions and return with the results + * from a preXXX chain. */ - public Object get(Object key); + public void bypass(); /** - * Set an environment variable - * @param key the key - * @param value the value + * Mark coprocessor chain processing as complete. Causes framework to return + * immediately without calling any additional chained coprocessors. */ - public void put(Object key, Object value); - - /** - * Remove an environment variable - * @param key the key - * @return the object corresponding to the environment variable, if set - */ - public Object remove(Object key); - + public void complete(); } Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java?rev=1049471&r1=1049470&r2=1049471&view=diff ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java (original) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java Wed Dec 15 10:11:59 2010 @@ -27,6 +27,8 @@ import org.apache.hadoop.hbase.client.Re import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.coprocessor.CoprocessorEnvironment; +import org.apache.hadoop.hbase.regionserver.InternalScanner; + import java.io.IOException; /** @@ -37,62 +39,92 @@ public interface RegionObserver { /** * Called before a client makes a GetClosestRowBefore request. + *

+ * Call CoprocessorEnvironment#bypass to skip default actions + *

+ * Call CoprocessorEnvironment#complete to skip any subsequent chained + * coprocessors * @param e the environment provided by the region server * @param row the row * @param family the family + * @param result The result to return to the client if default processing + * is bypassed. Can be modified. Will not be used if default processing + * is not bypassed. * @throws IOException if an error occurred on the coprocessor */ public void preGetClosestRowBefore(final CoprocessorEnvironment e, - final byte [] row, final byte [] family) + final byte [] row, final byte [] family, final Result result) throws IOException; /** * Called after a client makes a GetClosestRowBefore request. + *

+ * Call CoprocessorEnvironment#complete to skip any subsequent chained + * coprocessors * @param e the environment provided by the region server * @param row the row * @param family the desired family - * @param result the result set - * @return the possible tranformed result set to return to the client + * @param result the result to return to the client, modify as necessary * @throws IOException if an error occurred on the coprocessor */ - public Result postGetClosestRowBefore(final CoprocessorEnvironment e, + public void postGetClosestRowBefore(final CoprocessorEnvironment e, final byte [] row, final byte [] family, final Result result) throws IOException; /** - * Called before the client perform a get() + * Called before the client performs a Get + *

+ * Call CoprocessorEnvironment#bypass to skip default actions + *

+ * Call CoprocessorEnvironment#complete to skip any subsequent chained + * coprocessors * @param e the environment provided by the region server * @param get the Get request - * @return the possibly transformed Get object by coprocessor + * @param result The result to return to the client if default processing + * is bypassed. Can be modified. Will not be used if default processing + * is not bypassed. * @throws IOException if an error occurred on the coprocessor */ - public Get preGet(final CoprocessorEnvironment e, final Get get) + public void preGet(final CoprocessorEnvironment e, final Get get, + final List result) throws IOException; /** - * Called after the client perform a get() + * Called after the client performs a Get + *

+ * Call CoprocessorEnvironment#complete to skip any subsequent chained + * coprocessors * @param e the environment provided by the region server * @param get the Get request - * @param results the result list - * @return the possibly transformed result list to return to client + * @param result the result to return to the client, modify as necessary * @throws IOException if an error occurred on the coprocessor */ - public List postGet(final CoprocessorEnvironment e, final Get get, - final List results) + public void postGet(final CoprocessorEnvironment e, final Get get, + final List result) throws IOException; /** * Called before the client tests for existence using a Get. + *

+ * Call CoprocessorEnvironment#bypass to skip default actions + *

+ * Call CoprocessorEnvironment#complete to skip any subsequent chained + * coprocessors * @param e the environment provided by the region server * @param get the Get request - * @return the possibly transformed Get object by coprocessor + * @param exists + * @return the value to return to the client if bypassing default processing * @throws IOException if an error occurred on the coprocessor */ - public Get preExists(final CoprocessorEnvironment e, final Get get) + public boolean preExists(final CoprocessorEnvironment e, final Get get, + final boolean exists) throws IOException; /** * Called after the client tests for existence using a Get. + *

+ * Call CoprocessorEnvironment#complete to skip any subsequent chained + * coprocessors * @param e the environment provided by the region server * @param get the Get request * @param exists the result returned by the region server @@ -105,64 +137,92 @@ public interface RegionObserver { /** * Called before the client stores a value. + *

+ * Call CoprocessorEnvironment#bypass to skip default actions + *

+ * Call CoprocessorEnvironment#complete to skip any subsequent chained + * coprocessors * @param e the environment provided by the region server - * @param familyMap map of family to edits for the given family. - * @return the possibly transformed map to actually use + * @param familyMap map of family to edits for the given family + * @param writeToWAL true if the change should be written to the WAL * @throws IOException if an error occurred on the coprocessor */ - public Map> prePut(final CoprocessorEnvironment e, - final Map> familyMap) + public void prePut(final CoprocessorEnvironment e, final Map> familyMap, final boolean writeToWAL) throws IOException; /** * Called after the client stores a value. + *

+ * Call CoprocessorEnvironment#complete to skip any subsequent chained + * coprocessors * @param e the environment provided by the region server - * @param familyMap map of family to edits for the given family. + * @param familyMap map of family to edits for the given family + * @param writeToWAL true if the change should be written to the WAL * @throws IOException if an error occurred on the coprocessor */ public void postPut(final CoprocessorEnvironment e, final Map> familyMap) + List> familyMap, final boolean writeToWAL) throws IOException; /** * Called before the client deletes a value. + *

+ * Call CoprocessorEnvironment#bypass to skip default actions + *

+ * Call CoprocessorEnvironment#complete to skip any subsequent chained + * coprocessors * @param e the environment provided by the region server - * @param familyMap map of family to edits for the given family. - * @return the possibly transformed map to actually use + * @param familyMap map of family to edits for the given family + * @param writeToWAL true if the change should be written to the WAL * @throws IOException if an error occurred on the coprocessor */ - public Map> preDelete(final CoprocessorEnvironment e, - final Map> familyMap) + public void preDelete(final CoprocessorEnvironment e, final Map> familyMap, final boolean writeToWAL) throws IOException; /** * Called after the client deletes a value. + *

+ * Call CoprocessorEnvironment#complete to skip any subsequent chained + * coprocessors * @param e the environment provided by the region server - * @param familyMap map of family to edits for the given family. + * @param familyMap map of family to edits for the given family + * @param writeToWAL true if the change should be written to the WAL * @throws IOException if an error occurred on the coprocessor */ public void postDelete(final CoprocessorEnvironment e, - final Map> familyMap) + final Map> familyMap, final boolean writeToWAL) throws IOException; /** * Called before checkAndPut + *

+ * Call CoprocessorEnvironment#bypass to skip default actions + *

+ * Call CoprocessorEnvironment#complete to skip any subsequent chained + * coprocessors * @param e the environment provided by the region server * @param row row to check * @param family column family * @param qualifier column qualifier * @param value the expected value * @param put data to put if check succeeds - * @return the possibly transformed map to actually use + * @param result + * @return the return value to return to client if bypassing default + * processing * @throws IOException if an error occurred on the coprocessor */ - public Put preCheckAndPut(final CoprocessorEnvironment e, + public boolean preCheckAndPut(final CoprocessorEnvironment e, final byte [] row, final byte [] family, final byte [] qualifier, - final byte [] value, final Put put) + final byte [] value, final Put put, final boolean result) throws IOException; /** * Called after checkAndPut + *

+ * Call CoprocessorEnvironment#complete to skip any subsequent chained + * coprocessors * @param e the environment provided by the region server * @param row row to check * @param family column family @@ -170,7 +230,7 @@ public interface RegionObserver { * @param value the expected value * @param put data to put if check succeeds * @param result from the checkAndPut - * @return the possibly transformed value to return to client + * @return the possibly transformed return value to return to client * @throws IOException if an error occurred on the coprocessor */ public boolean postCheckAndPut(final CoprocessorEnvironment e, @@ -179,23 +239,32 @@ public interface RegionObserver { throws IOException; /** - * Called before checkAndPut + * Called before checkAndDelete + *

+ * Call CoprocessorEnvironment#bypass to skip default actions + *

+ * Call CoprocessorEnvironment#complete to skip any subsequent chained + * coprocessors * @param e the environment provided by the region server * @param row row to check * @param family column family * @param qualifier column qualifier * @param value the expected value * @param delete delete to commit if check succeeds - * @return the possibly transformed map to actually use + * @param result + * @return the value to return to client if bypassing default processing * @throws IOException if an error occurred on the coprocessor */ - public Delete preCheckAndDelete(final CoprocessorEnvironment e, + public boolean preCheckAndDelete(final CoprocessorEnvironment e, final byte [] row, final byte [] family, final byte [] qualifier, - final byte [] value, final Delete delete) + final byte [] value, final Delete delete, final boolean result) throws IOException; /** * Called after checkAndDelete + *

+ * Call CoprocessorEnvironment#complete to skip any subsequent chained + * coprocessors * @param e the environment provided by the region server * @param row row to check * @param family column family @@ -203,7 +272,7 @@ public interface RegionObserver { * @param value the expected value * @param delete delete to commit if check succeeds * @param result from the CheckAndDelete - * @return the possibly transformed value to return to client + * @return the possibly transformed returned value to return to client * @throws IOException if an error occurred on the coprocessor */ public boolean postCheckAndDelete(final CoprocessorEnvironment e, @@ -213,13 +282,18 @@ public interface RegionObserver { /** * Called before incrementColumnValue + *

+ * Call CoprocessorEnvironment#bypass to skip default actions + *

+ * Call CoprocessorEnvironment#complete to skip any subsequent chained + * coprocessors * @param e the environment provided by the region server * @param row row to check * @param family column family * @param qualifier column qualifier * @param amount long amount to increment - * @param writeToWAL whether to write the increment to the WAL - * @return new amount to increment + * @param writeToWAL true if the change should be written to the WAL + * @return value to return to the client if bypassing default processing * @throws IOException if an error occurred on the coprocessor */ public long preIncrementColumnValue(final CoprocessorEnvironment e, @@ -229,12 +303,15 @@ public interface RegionObserver { /** * Called after incrementColumnValue + *

+ * Call CoprocessorEnvironment#complete to skip any subsequent chained + * coprocessors * @param e the environment provided by the region server * @param row row to check * @param family column family * @param qualifier column qualifier * @param amount long amount to increment - * @param writeToWAL whether to write the increment to the WAL + * @param writeToWAL true if the change should be written to the WAL * @param result the result returned by incrementColumnValue * @return the result to return to the client * @throws IOException if an error occurred on the coprocessor @@ -245,92 +322,137 @@ public interface RegionObserver { throws IOException; /** - * Called before incrementColumnValue + * Called before Increment + *

+ * Call CoprocessorEnvironment#bypass to skip default actions + *

+ * Call CoprocessorEnvironment#complete to skip any subsequent chained + * coprocessors * @param e the environment provided by the region server * @param increment increment object - * @param writeToWAL whether to write the increment to the WAL - * @return new Increment instance + * @param result The result to return to the client if default processing + * is bypassed. Can be modified. Will not be used if default processing + * is not bypassed. + * @param writeToWAL true if the change should be written to the WAL * @throws IOException if an error occurred on the coprocessor */ - public Increment preIncrement(final CoprocessorEnvironment e, - final Increment increment) + public void preIncrement(final CoprocessorEnvironment e, + final Increment increment, final Result result) throws IOException; /** * Called after increment + *

+ * Call CoprocessorEnvironment#complete to skip any subsequent chained + * coprocessors * @param e the environment provided by the region server * @param increment increment object - * @param writeToWAL whether to write the increment to the WAL - * @param result the result returned by increment - * @return the result to return to the client + * @param writeToWAL true if the change should be written to the WAL + * @param result the result returned by increment, can be modified * @throws IOException if an error occurred on the coprocessor */ - public Result postIncrement(final CoprocessorEnvironment e, + public void postIncrement(final CoprocessorEnvironment e, final Increment increment, final Result result) throws IOException; /** * Called before the client opens a new scanner. + *

+ * Call CoprocessorEnvironment#bypass to skip default actions + *

+ * Call CoprocessorEnvironment#complete to skip any subsequent chained + * coprocessors * @param e the environment provided by the region server * @param scan the Scan specification - * @return the possibly transformed Scan to actually use + * @param s if not null, the base scanner + * @return an InternalScanner instance to use instead of the base scanner if + * overriding default behavior, null otherwise * @throws IOException if an error occurred on the coprocessor */ - public Scan preScannerOpen(final CoprocessorEnvironment e, final Scan scan) + public InternalScanner preScannerOpen(final CoprocessorEnvironment e, + final Scan scan, final InternalScanner s) throws IOException; /** * Called after the client opens a new scanner. + *

+ * Call CoprocessorEnvironment#complete to skip any subsequent chained + * coprocessors * @param e the environment provided by the region server * @param scan the Scan specification - * @param scannerId the scanner id allocated by the region server + * @param s if not null, the base scanner + * @return the scanner instance to use * @throws IOException if an error occurred on the coprocessor */ - public void postScannerOpen(final CoprocessorEnvironment e, final Scan scan, - final long scannerId) + public InternalScanner postScannerOpen(final CoprocessorEnvironment e, + final Scan scan, final InternalScanner s) throws IOException; /** * Called before the client asks for the next row on a scanner. - * @param e the environment provided by the region server - * @param scannerId the scanner id - * @param results the result set returned by the region server - * @return the possibly transformed result set to actually return - * @throws IOException if an error occurred on the coprocessor - */ - public void preScannerNext(final CoprocessorEnvironment e, - final long scannerId) + *

+ * Call CoprocessorEnvironment#bypass to skip default actions + *

+ * Call CoprocessorEnvironment#complete to skip any subsequent chained + * coprocessors + * @param e the environment provided by the region server + * @param s the scanner + * @param result The result to return to the client if default processing + * is bypassed. Can be modified. Will not be returned if default processing + * is not bypassed. + * @param limit the maximum number of results to return + * @param hasNext the 'has more' indication + * @return 'has more' indication that should be sent to client + * @throws IOException if an error occurred on the coprocessor + */ + public boolean preScannerNext(final CoprocessorEnvironment e, + final InternalScanner s, final List result, + final int limit, final boolean hasNext) throws IOException; /** * Called after the client asks for the next row on a scanner. - * @param e the environment provided by the region server - * @param scannerId the scanner id - * @param results the result set returned by the region server - * @return the possibly transformed result set to actually return + *

+ * Call CoprocessorEnvironment#complete to skip any subsequent chained + * coprocessors + * @param e the environment provided by the region server + * @param s the scanner + * @param result the result to return to the client, can be modified + * @param limit the maximum number of results to return + * @param hasNext the 'has more' indication + * @return 'has more' indication that should be sent to client * @throws IOException if an error occurred on the coprocessor */ - public List postScannerNext(final CoprocessorEnvironment e, - final long scannerId, final List results) + public boolean postScannerNext(final CoprocessorEnvironment e, + final InternalScanner s, final List result, final int limit, + final boolean hasNext) throws IOException; /** * Called before the client closes a scanner. + *

+ * Call CoprocessorEnvironment#bypass to skip default actions + *

+ * Call CoprocessorEnvironment#complete to skip any subsequent chained + * coprocessors * @param e the environment provided by the region server - * @param scannerId the scanner id + * @param s the scanner * @throws IOException if an error occurred on the coprocessor */ public void preScannerClose(final CoprocessorEnvironment e, - final long scannerId) + final InternalScanner s) throws IOException; /** * Called after the client closes a scanner. + *

+ * Call CoprocessorEnvironment#complete to skip any subsequent chained + * coprocessors * @param e the environment provided by the region server - * @param scannerId the scanner id + * @param s the scanner * @throws IOException if an error occurred on the coprocessor */ public void postScannerClose(final CoprocessorEnvironment e, - final long scannerId) + final InternalScanner s) throws IOException; } Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CoprocessorHost.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CoprocessorHost.java?rev=1049471&r1=1049470&r2=1049471&view=diff ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CoprocessorHost.java (original) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CoprocessorHost.java Wed Dec 15 10:11:59 2010 @@ -47,6 +47,7 @@ import java.net.URL; import java.net.URLClassLoader; import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -284,6 +285,14 @@ public class CoprocessorHost { } } + boolean shouldBypass() { + return bypass.getAndSet(false); + } + + boolean shouldComplete() { + return complete.getAndSet(false); + } + /** @return the coprocessor environment version */ @Override public int getVersion() { @@ -319,30 +328,14 @@ public class CoprocessorHost { return new HTableWrapper(tableName); } - /** - * @param key the key - * @return the value, or null if it does not exist - */ - @Override - public Object get(Object key) { - return vars.get(key); - } - - /** - * @param key the key - * @param value the value - */ @Override - public void put(Object key, Object value) { - vars.put(key, value); + public void complete() { + complete.set(true); } - /** - * @param key the key - */ @Override - public Object remove(Object key) { - return vars.remove(key); + public void bypass() { + bypass.set(true); } } @@ -355,8 +348,10 @@ public class CoprocessorHost { HRegion region; /** Ordered set of loaded coprocessors with lock */ final ReentrantReadWriteLock coprocessorLock = new ReentrantReadWriteLock(); - Set coprocessors = + final Set coprocessors = new TreeSet(new EnvironmentPriorityComparator()); + final AtomicBoolean bypass = new AtomicBoolean(false); + final AtomicBoolean complete = new AtomicBoolean(false); /** * Constructor @@ -482,6 +477,7 @@ public class CoprocessorHost { * @param priority priority * @throws IOException Exception */ + @SuppressWarnings({ "unchecked", "rawtypes" }) public void load(Class implClass, Coprocessor.Priority priority) throws IOException { // create the instance @@ -581,6 +577,9 @@ public class CoprocessorHost { coprocessorLock.readLock().lock(); for (Environment env: coprocessors) { env.impl.preOpen(env); + if (env.shouldComplete()) { + break; + } } } finally { coprocessorLock.readLock().unlock(); @@ -595,6 +594,9 @@ public class CoprocessorHost { coprocessorLock.readLock().lock(); for (Environment env: coprocessors) { env.impl.postOpen(env); + if (env.shouldComplete()) { + break; + } } } finally { coprocessorLock.readLock().unlock(); @@ -610,7 +612,6 @@ public class CoprocessorHost { coprocessorLock.writeLock().lock(); for (Environment env: coprocessors) { env.impl.preClose(env, abortRequested); - env.shutdown(); } } finally { coprocessorLock.writeLock().unlock(); @@ -642,6 +643,9 @@ public class CoprocessorHost { coprocessorLock.readLock().lock(); for (Environment env: coprocessors) { env.impl.preCompact(env, willSplit); + if (env.shouldComplete()) { + break; + } } } finally { coprocessorLock.readLock().unlock(); @@ -657,6 +661,9 @@ public class CoprocessorHost { coprocessorLock.readLock().lock(); for (Environment env: coprocessors) { env.impl.postCompact(env, willSplit); + if (env.shouldComplete()) { + break; + } } } finally { coprocessorLock.readLock().unlock(); @@ -671,6 +678,9 @@ public class CoprocessorHost { coprocessorLock.readLock().lock(); for (Environment env: coprocessors) { env.impl.preFlush(env); + if (env.shouldComplete()) { + break; + } } } finally { coprocessorLock.readLock().unlock(); @@ -685,6 +695,9 @@ public class CoprocessorHost { coprocessorLock.readLock().lock(); for (Environment env: coprocessors) { env.impl.postFlush(env); + if (env.shouldComplete()) { + break; + } } } finally { coprocessorLock.readLock().unlock(); @@ -699,6 +712,9 @@ public class CoprocessorHost { coprocessorLock.readLock().lock(); for (Environment env: coprocessors) { env.impl.preSplit(env); + if (env.shouldComplete()) { + break; + } } } finally { coprocessorLock.readLock().unlock(); @@ -715,6 +731,9 @@ public class CoprocessorHost { coprocessorLock.readLock().lock(); for (Environment env: coprocessors) { env.impl.postSplit(env, l, r); + if (env.shouldComplete()) { + break; + } } } finally { coprocessorLock.readLock().unlock(); @@ -727,17 +746,25 @@ public class CoprocessorHost { * @param row the row key * @param family the family * @param result the result set from the region + * @return true if default processing should be bypassed * @exception IOException Exception */ - public void preGetClosestRowBefore(final byte[] row, final byte[] family) - throws IOException { + public boolean preGetClosestRowBefore(final byte[] row, final byte[] family, + final Result result) throws IOException { try { + boolean bypass = false; coprocessorLock.readLock().lock(); for (Environment env: coprocessors) { if (env.impl instanceof RegionObserver) { - ((RegionObserver)env.impl).preGetClosestRowBefore(env, row, family); + ((RegionObserver)env.impl).preGetClosestRowBefore(env, row, family, + result); + bypass |= env.shouldBypass(); + if (env.shouldComplete()) { + break; + } } } + return bypass; } finally { coprocessorLock.readLock().unlock(); } @@ -747,20 +774,21 @@ public class CoprocessorHost { * @param row the row key * @param family the family * @param result the result set from the region - * @return the result set to return to the client * @exception IOException Exception */ - public Result postGetClosestRowBefore(final byte[] row, final byte[] family, - Result result) throws IOException { + public void postGetClosestRowBefore(final byte[] row, final byte[] family, + final Result result) throws IOException { try { coprocessorLock.readLock().lock(); for (Environment env: coprocessors) { if (env.impl instanceof RegionObserver) { - result = ((RegionObserver)env.impl) - .postGetClosestRowBefore(env, row, family, result); + ((RegionObserver)env.impl).postGetClosestRowBefore(env, row, family, + result); + if (env.shouldComplete()) { + break; + } } } - return result; } finally { coprocessorLock.readLock().unlock(); } @@ -768,18 +796,24 @@ public class CoprocessorHost { /** * @param get the Get request - * @return the possibly transformed Get object by coprocessor + * @return true if default processing should be bypassed * @exception IOException Exception */ - public Get preGet(Get get) throws IOException { + public boolean preGet(final Get get, final List results) + throws IOException { try { + boolean bypass = false; coprocessorLock.readLock().lock(); for (Environment env: coprocessors) { if (env.impl instanceof RegionObserver) { - get = ((RegionObserver)env.impl).preGet(env, get); + ((RegionObserver)env.impl).preGet(env, get, results); + bypass |= env.shouldBypass(); + if (env.shouldComplete()) { + break; + } } } - return get; + return bypass; } finally { coprocessorLock.readLock().unlock(); } @@ -791,16 +825,18 @@ public class CoprocessorHost { * @return the possibly transformed result set to use * @exception IOException Exception */ - public List postGet(final Get get, List results) - throws IOException { + public void postGet(final Get get, final List results) + throws IOException { try { coprocessorLock.readLock().lock(); for (Environment env: coprocessors) { if (env.impl instanceof RegionObserver) { - results = ((RegionObserver)env.impl).postGet(env, get, results); + ((RegionObserver)env.impl).postGet(env, get, results); + if (env.shouldComplete()) { + break; + } } } - return results; } finally { coprocessorLock.readLock().unlock(); } @@ -808,18 +844,25 @@ public class CoprocessorHost { /** * @param get the Get request - * @param exists the result returned by the region server + * @return true or false to return to client if bypassing normal operation, + * or null otherwise * @exception IOException Exception */ - public Get preExists(Get get) throws IOException { + public Boolean preExists(final Get get) throws IOException { try { + boolean bypass = false; + boolean exists = false; coprocessorLock.readLock().lock(); for (Environment env: coprocessors) { if (env.impl instanceof RegionObserver) { - get = ((RegionObserver)env.impl).preExists(env, get); + exists = ((RegionObserver)env.impl).preExists(env, get, exists); + bypass |= env.shouldBypass(); + if (env.shouldComplete()) { + break; + } } } - return get; + return bypass ? exists : null; } finally { coprocessorLock.readLock().unlock(); } @@ -837,7 +880,10 @@ public class CoprocessorHost { coprocessorLock.readLock().lock(); for (Environment env: coprocessors) { if (env.impl instanceof RegionObserver) { - exists &= ((RegionObserver)env.impl).postExists(env, get, exists); + exists = ((RegionObserver)env.impl).postExists(env, get, exists); + if (env.shouldComplete()) { + break; + } } } return exists; @@ -848,19 +894,25 @@ public class CoprocessorHost { /** * @param familyMap map of family to edits for the given family. - * @return the possibly transformed map to actually use + * @param writeToWAL true if the change should be written to the WAL + * @return true if default processing should be bypassed * @exception IOException Exception */ - public Map> prePut(Map> familyMap) - throws IOException { + public boolean prePut(final Map> familyMap, + final boolean writeToWAL) throws IOException { try { + boolean bypass = false; coprocessorLock.readLock().lock(); for (Environment env: coprocessors) { if (env.impl instanceof RegionObserver) { - familyMap = ((RegionObserver)env.impl).prePut(env, familyMap); + ((RegionObserver)env.impl).prePut(env, familyMap, writeToWAL); + bypass |= env.shouldBypass(); + if (env.shouldComplete()) { + break; + } } } - return familyMap; + return bypass; } finally { coprocessorLock.readLock().unlock(); } @@ -868,15 +920,19 @@ public class CoprocessorHost { /** * @param familyMap map of family to edits for the given family. + * @param writeToWAL true if the change should be written to the WAL * @exception IOException Exception */ - public void postPut(Map> familyMap) - throws IOException { + public void postPut(final Map> familyMap, + final boolean writeToWAL) throws IOException { try { coprocessorLock.readLock().lock(); for (Environment env: coprocessors) { if (env.impl instanceof RegionObserver) { - ((RegionObserver)env.impl).postPut(env, familyMap); + ((RegionObserver)env.impl).postPut(env, familyMap, writeToWAL); + if (env.shouldComplete()) { + break; + } } } } finally { @@ -886,19 +942,25 @@ public class CoprocessorHost { /** * @param familyMap map of family to edits for the given family. - * @return the possibly transformed map to actually use + * @param writeToWAL true if the change should be written to the WAL + * @return true if default processing should be bypassed * @exception IOException Exception */ - public Map> preDelete(Map> familyMap) - throws IOException { + public boolean preDelete(final Map> familyMap, + final boolean writeToWAL) throws IOException { try { + boolean bypass = false; coprocessorLock.readLock().lock(); for (Environment env: coprocessors) { if (env.impl instanceof RegionObserver) { - familyMap = ((RegionObserver)env.impl).preDelete(env, familyMap); + ((RegionObserver)env.impl).preDelete(env, familyMap, writeToWAL); + bypass |= env.shouldBypass(); + if (env.shouldComplete()) { + break; + } } } - return familyMap; + return bypass; } finally { coprocessorLock.readLock().unlock(); } @@ -906,15 +968,19 @@ public class CoprocessorHost { /** * @param familyMap map of family to edits for the given family. + * @param writeToWAL true if the change should be written to the WAL * @exception IOException Exception */ - public void postDelete(Map> familyMap) - throws IOException { + public void postDelete(final Map> familyMap, + final boolean writeToWAL) throws IOException { try { coprocessorLock.readLock().lock(); for (Environment env: coprocessors) { if (env.impl instanceof RegionObserver) { - ((RegionObserver)env.impl).postDelete(env, familyMap); + ((RegionObserver)env.impl).postDelete(env, familyMap, writeToWAL); + if (env.shouldComplete()) { + break; + } } } } finally { @@ -928,21 +994,29 @@ public class CoprocessorHost { * @param qualifier column qualifier * @param value the expected value * @param put data to put if check succeeds + * @return true or false to return to client if default processing should + * be bypassed, or null otherwise * @throws IOException e */ - public Put preCheckAndPut(final byte [] row, final byte [] family, + public Boolean preCheckAndPut(final byte [] row, final byte [] family, final byte [] qualifier, final byte [] value, Put put) throws IOException { try { + boolean bypass = false; + boolean result = false; coprocessorLock.readLock().lock(); for (Environment env: coprocessors) { if (env.impl instanceof RegionObserver) { - put = ((RegionObserver)env.impl).preCheckAndPut(env, row, family, - qualifier, value, put); + result = ((RegionObserver)env.impl).preCheckAndPut(env, row, family, + qualifier, value, put, result); + bypass |= env.shouldBypass(); + if (env.shouldComplete()) { + break; + } } } - return put; + return bypass ? result : null; } finally { coprocessorLock.readLock().unlock(); } @@ -967,6 +1041,9 @@ public class CoprocessorHost { if (env.impl instanceof RegionObserver) { result = ((RegionObserver)env.impl).postCheckAndPut(env, row, family, qualifier, value, put, result); + if (env.shouldComplete()) { + break; + } } } return result; @@ -981,21 +1058,29 @@ public class CoprocessorHost { * @param qualifier column qualifier * @param value the expected value * @param delete delete to commit if check succeeds + * @return true or false to return to client if default processing should + * be bypassed, or null otherwise * @throws IOException e */ - public Delete preCheckAndDelete(final byte [] row, final byte [] family, + public Boolean preCheckAndDelete(final byte [] row, final byte [] family, final byte [] qualifier, final byte [] value, Delete delete) throws IOException { try { + boolean bypass = false; + boolean result = false; coprocessorLock.readLock().lock(); for (Environment env: coprocessors) { if (env.impl instanceof RegionObserver) { - delete = ((RegionObserver)env.impl).preCheckAndDelete(env, row, - family, qualifier, value, delete); + result = ((RegionObserver)env.impl).preCheckAndDelete(env, row, + family, qualifier, value, delete, result); + bypass |= env.shouldBypass(); + if (env.shouldComplete()) { + break; + } } } - return delete; + return bypass ? result : null; } finally { coprocessorLock.readLock().unlock(); } @@ -1020,6 +1105,9 @@ public class CoprocessorHost { if (env.impl instanceof RegionObserver) { result = ((RegionObserver)env.impl).postCheckAndDelete(env, row, family, qualifier, value, delete, result); + if (env.shouldComplete()) { + break; + } } } return result; @@ -1033,25 +1121,31 @@ public class CoprocessorHost { * @param family column family * @param qualifier column qualifier * @param amount long amount to increment - * @param writeToWAL whether to write the increment to the WAL - * @return new amount to increment + * @param writeToWAL true if the change should be written to the WAL + * @return return value for client if default operation should be bypassed, + * or null otherwise * @throws IOException if an error occurred on the coprocessor */ - public long preIncrementColumnValue(final byte [] row, final byte [] family, + public Long preIncrementColumnValue(final byte [] row, final byte [] family, final byte [] qualifier, long amount, final boolean writeToWAL) throws IOException { try { + boolean bypass = false; coprocessorLock.readLock().lock(); for (Environment env: coprocessors) { if (env.impl instanceof RegionObserver) { amount = ((RegionObserver)env.impl).preIncrementColumnValue(env, - row, family, qualifier, amount, writeToWAL); + row, family, qualifier, amount, writeToWAL); + bypass |= env.shouldBypass(); + if (env.shouldComplete()) { + break; + } } } + return bypass ? amount : null; } finally { coprocessorLock.readLock().unlock(); } - return amount; } /** @@ -1059,7 +1153,7 @@ public class CoprocessorHost { * @param family column family * @param qualifier column qualifier * @param amount long amount to increment - * @param writeToWAL whether to write the increment to the WAL + * @param writeToWAL true if the change should be written to the WAL * @param result the result returned by incrementColumnValue * @return the result to return to the client * @throws IOException if an error occurred on the coprocessor @@ -1072,7 +1166,10 @@ public class CoprocessorHost { for (Environment env: coprocessors) { if (env.impl instanceof RegionObserver) { result = ((RegionObserver)env.impl).postIncrementColumnValue(env, - row, family, qualifier, amount, writeToWAL, result); + row, family, qualifier, amount, writeToWAL, result); + if (env.shouldComplete()) { + break; + } } } } finally { @@ -1083,155 +1180,202 @@ public class CoprocessorHost { /** * @param increment increment object - * @param writeToWAL whether to write the increment to the WAL - * @return new amount to increment + * @param writeToWAL true if the change should be written to the WAL + * @return result to return to client if default operation should be + * bypassed, null otherwise * @throws IOException if an error occurred on the coprocessor */ - public Increment preIncrement(Increment increment) + public Result preIncrement(Increment increment) throws IOException { try { + boolean bypass = false; + Result result = new Result(); coprocessorLock.readLock().lock(); for (Environment env: coprocessors) { if (env.impl instanceof RegionObserver) { - increment = ((RegionObserver)env.impl).preIncrement(env, increment); + ((RegionObserver)env.impl).preIncrement(env, increment, result); + bypass |= env.shouldBypass(); + if (env.shouldComplete()) { + break; + } } } + return bypass ? result : null; } finally { coprocessorLock.readLock().unlock(); } - return increment; } /** * @param increment increment object - * @param writeToWAL whether to write the increment to the WAL + * @param writeToWAL true if the change should be written to the WAL * @param result the result returned by incrementColumnValue - * @return the result to return to the client * @throws IOException if an error occurred on the coprocessor */ - public Result postIncrement(final Increment increment, Result result) + public void postIncrement(final Increment increment, Result result) throws IOException { try { coprocessorLock.readLock().lock(); for (Environment env: coprocessors) { if (env.impl instanceof RegionObserver) { - result = ((RegionObserver)env.impl).postIncrement(env, increment, - result); + ((RegionObserver)env.impl).postIncrement(env, increment, result); + if (env.shouldComplete()) { + break; + } } } } finally { coprocessorLock.readLock().unlock(); } - return result; } /** * @param scan the Scan specification + * @return scanner id to return to client if default operation should be + * bypassed, false otherwise * @exception IOException Exception */ - public Scan preScannerOpen(Scan scan) throws IOException { + public InternalScanner preScannerOpen(Scan scan) throws IOException { try { + boolean bypass = false; + InternalScanner s = null; coprocessorLock.readLock().lock(); for (Environment env: coprocessors) { if (env.impl instanceof RegionObserver) { - scan = ((RegionObserver)env.impl).preScannerOpen(env, scan); + s = ((RegionObserver)env.impl).preScannerOpen(env, scan, s); + bypass |= env.shouldBypass(); + if (env.shouldComplete()) { + break; + } } } + return bypass ? s : null; } finally { coprocessorLock.readLock().unlock(); } - return scan; } /** * @param scan the Scan specification * @param scannerId the scanner id allocated by the region server + * @return the scanner instance to use * @exception IOException Exception */ - public void postScannerOpen(final Scan scan, long scannerId) + public InternalScanner postScannerOpen(final Scan scan, InternalScanner s) throws IOException { try { coprocessorLock.readLock().lock(); for (Environment env: coprocessors) { if (env.impl instanceof RegionObserver) { - ((RegionObserver)env.impl).postScannerOpen(env, scan, scannerId); + s = ((RegionObserver)env.impl).postScannerOpen(env, scan, s); + if (env.shouldComplete()) { + break; + } } } + return s; } finally { coprocessorLock.readLock().unlock(); } } /** - * @param scannerId the scanner id + * @param s the scanner * @param results the result set returned by the region server - * @return the possibly transformed result set to actually return + * @param limit the maximum number of results to return + * @return 'has next' indication to client if bypassing default behavior, or + * null otherwise * @exception IOException Exception */ - public void preScannerNext(final long scannerId) throws IOException { + public Boolean preScannerNext(final InternalScanner s, + final List results, int limit) throws IOException { try { + boolean bypass = false; + boolean hasNext = false; coprocessorLock.readLock().lock(); for (Environment env: coprocessors) { if (env.impl instanceof RegionObserver) { - ((RegionObserver)env.impl).preScannerNext(env, scannerId); + hasNext = ((RegionObserver)env.impl).preScannerNext(env, s, results, + limit, hasNext); + bypass |= env.shouldBypass(); + if (env.shouldComplete()) { + break; + } } } + return bypass ? hasNext : null; } finally { coprocessorLock.readLock().unlock(); } } /** - * @param scannerId the scanner id + * @param s the scanner * @param results the result set returned by the region server - * @return the possibly transformed result set to actually return + * @param limit the maximum number of results to return + * @param hasMore + * @return 'has more' indication to give to client * @exception IOException Exception */ - public List postScannerNext(final long scannerId, - List results) throws IOException { + public boolean postScannerNext(final InternalScanner s, + final List results, final int limit, boolean hasMore) + throws IOException { try { coprocessorLock.readLock().lock(); for (Environment env: coprocessors) { if (env.impl instanceof RegionObserver) { - results = ((RegionObserver)env.impl).postScannerNext(env, scannerId, - results); + hasMore = ((RegionObserver)env.impl).postScannerNext(env, s, + results, limit, hasMore); + if (env.shouldComplete()) { + break; + } } } - return results; + return hasMore; } finally { coprocessorLock.readLock().unlock(); } } /** - * @param scannerId the scanner id + * @param s the scanner + * @return true if default behavior should be bypassed, false otherwise * @exception IOException Exception */ - public void preScannerClose(final long scannerId) + public boolean preScannerClose(final InternalScanner s) throws IOException { try { + boolean bypass = false; coprocessorLock.readLock().lock(); for (Environment env: coprocessors) { if (env.impl instanceof RegionObserver) { - ((RegionObserver)env.impl).preScannerClose(env, scannerId); + ((RegionObserver)env.impl).preScannerClose(env, s); + bypass |= env.shouldBypass(); + if (env.shouldComplete()) { + break; + } } } + return bypass; } finally { coprocessorLock.readLock().unlock(); } } /** - * @param scannerId the scanner id + * @param s the scanner * @exception IOException Exception */ - public void postScannerClose(final long scannerId) + public void postScannerClose(final InternalScanner s) throws IOException { try { coprocessorLock.readLock().lock(); for (Environment env: coprocessors) { if (env.impl instanceof RegionObserver) { - ((RegionObserver)env.impl).postScannerClose(env, scannerId); + ((RegionObserver)env.impl).postScannerClose(env, s); + if (env.shouldComplete()) { + break; + } } } } finally { Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1049471&r1=1049470&r2=1049471&view=diff ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Wed Dec 15 10:11:59 2010 @@ -1112,27 +1112,29 @@ public class HRegion implements HeapSize */ public Result getClosestRowBefore(final byte [] row, final byte [] family) throws IOException { - Result result = null; + if (coprocessorHost != null) { + Result result = new Result(); + if (coprocessorHost.preGetClosestRowBefore(row, family, result)) { + return result; + } + } // look across all the HStores for this region and determine what the // closest key is across all column families, since the data may be sparse - KeyValue key = null; checkRow(row); startRegionOperation(); - if (coprocessorHost != null) { - coprocessorHost.preGetClosestRowBefore(row, family); - } try { Store store = getStore(family); KeyValue kv = new KeyValue(row, HConstants.LATEST_TIMESTAMP); // get the closest key. (HStore.getRowKeyAtOrBefore can return null) - key = store.getRowKeyAtOrBefore(kv); + KeyValue key = store.getRowKeyAtOrBefore(kv); + Result result = null; if (key != null) { Get get = new Get(key.getRow()); get.addFamily(family); result = get(get, null); } if (coprocessorHost != null) { - result = coprocessorHost.postGetClosestRowBefore(row, family, result); + coprocessorHost.postGetClosestRowBefore(row, family, result); } return result; } finally { @@ -1150,8 +1152,7 @@ public class HRegion implements HeapSize * @return InternalScanner * @throws IOException read exceptions */ - public InternalScanner getScanner(Scan scan) - throws IOException { + public InternalScanner getScanner(Scan scan) throws IOException { return getScanner(scan, null); } @@ -1175,13 +1176,17 @@ public class HRegion implements HeapSize } } - protected InternalScanner instantiateInternalScanner(Scan scan, List additionalScanners) throws IOException { + protected InternalScanner instantiateInternalScanner(Scan scan, + List additionalScanners) throws IOException { + InternalScanner s = null; if (coprocessorHost != null) { - coprocessorHost.preScannerOpen(scan); + s = coprocessorHost.preScannerOpen(scan); + } + if (s == null) { + s = new RegionScanner(scan, additionalScanners); } - InternalScanner s = new RegionScanner(scan, additionalScanners); if (coprocessorHost != null) { - coprocessorHost.postScannerOpen(scan, s.hashCode()); + s = coprocessorHost.postScannerOpen(scan, s); } return s; } @@ -1243,17 +1248,20 @@ public class HRegion implements HeapSize * @throws IOException */ public void delete(Map> familyMap, boolean writeToWAL) - throws IOException { + throws IOException { + /* Run coprocessor pre hook outside of locks to avoid deadlock */ + if (coprocessorHost != null) { + if (coprocessorHost.preDelete(familyMap, writeToWAL)) { + return; + } + } + long now = EnvironmentEdgeManager.currentTimeMillis(); byte [] byteNow = Bytes.toBytes(now); boolean flush = false; updatesLock.readLock().lock(); try { - if (coprocessorHost != null) { - familyMap = coprocessorHost.preDelete(familyMap); - } - for (Map.Entry> e : familyMap.entrySet()) { byte[] family = e.getKey(); @@ -1318,7 +1326,7 @@ public class HRegion implements HeapSize flush = isFlushSize(memstoreSize.addAndGet(addedSize)); if (coprocessorHost != null) { - coprocessorHost.postDelete(familyMap); + coprocessorHost.postDelete(familyMap, writeToWAL); } } finally { this.updatesLock.readLock().unlock(); @@ -1456,15 +1464,36 @@ public class HRegion implements HeapSize return batchOp.retCodes; } + @SuppressWarnings("unchecked") private long doMiniBatchPut(BatchOperationInProgress> batchOp) throws IOException { + /* Run coprocessor pre hook outside of locks to avoid deadlock */ + if (coprocessorHost != null) { + List> ops = + new ArrayList>(batchOp.operations.length); + for (int i = 0; i < batchOp.operations.length; i++) { + Pair nextPair = batchOp.operations[i]; + Put put = nextPair.getFirst(); + Map> familyMap = put.getFamilyMap(); + if (coprocessorHost.prePut(familyMap, put.getWriteToWAL())) { + // pre hook says skip this Put + // adjust nextIndexToProcess if we skipped before it + if (batchOp.nextIndexToProcess > i) { + batchOp.nextIndexToProcess--; + } + continue; + } + ops.add(nextPair); + } + batchOp.operations = ops.toArray(new Pair[ops.size()]); + } + long now = EnvironmentEdgeManager.currentTimeMillis(); byte[] byteNow = Bytes.toBytes(now); /** Keep track of the locks we hold so we can release them in finally clause */ List acquiredLocks = Lists.newArrayListWithCapacity(batchOp.operations.length); // reference family maps directly so coprocessors can mutate them if desired - Map>[] familyMaps = - new Map[batchOp.operations.length]; + Map>[] familyMaps = new Map[batchOp.operations.length]; // We try to set up a batch in the range [firstIndex,lastIndexExclusive) int firstIndex = batchOp.nextIndexToProcess; int lastIndexExclusive = firstIndex; @@ -1481,12 +1510,6 @@ public class HRegion implements HeapSize Integer providedLockId = nextPair.getSecond(); Map> familyMap = put.getFamilyMap(); - // Check any loaded coprocessors - /* TODO: we should catch any throws coprocessor exceptions here to allow the - rest of the batch to continue. This means fixing HBASE-2898 */ - if (coprocessorHost != null) { - familyMap = coprocessorHost.prePut(familyMap); - } // store the family map reference to allow for mutations familyMaps[lastIndexExclusive] = familyMap; @@ -1555,15 +1578,22 @@ public class HRegion implements HeapSize long addedSize = 0; for (int i = firstIndex; i < lastIndexExclusive; i++) { if (batchOp.retCodes[i] != OperationStatusCode.NOT_RUN) continue; - addedSize += applyFamilyMapToMemstore(familyMaps[i]); batchOp.retCodes[i] = OperationStatusCode.SUCCESS; + } - // execute any coprocessor post-hooks - if (coprocessorHost != null) { - coprocessorHost.postDelete(familyMaps[i]); + // ------------------------------------ + // STEP 5. Run coprocessor post hooks + // ------------------------------------ + if (coprocessorHost != null) { + for (int i = firstIndex; i < lastIndexExclusive; i++) { + // only for successful puts + if (batchOp.retCodes[i] != OperationStatusCode.SUCCESS) continue; + Put p = batchOp.operations[i].getFirst(); + coprocessorHost.postPut(familyMaps[i], p.getWriteToWAL()); } } + success = true; return addedSize; } finally { @@ -1738,8 +1768,14 @@ public class HRegion implements HeapSize * @param writeToWAL if true, then we should write to the log * @throws IOException */ - private void put(Map> familyMap, - boolean writeToWAL) throws IOException { + private void put(Map> familyMap, boolean writeToWAL) + throws IOException { + /* run pre put hook outside of lock to avoid deadlock */ + if (coprocessorHost != null) { + if (coprocessorHost.prePut(familyMap, writeToWAL)) { + return; + } + } long now = EnvironmentEdgeManager.currentTimeMillis(); byte[] byteNow = Bytes.toBytes(now); @@ -1747,9 +1783,6 @@ public class HRegion implements HeapSize this.updatesLock.readLock().lock(); try { - if (coprocessorHost != null) { - familyMap = coprocessorHost.prePut(familyMap); - } checkFamilies(familyMap.keySet()); updateKVTimestamps(familyMap.values(), byteNow); // write/sync to WAL should happen before we touch memstore. @@ -1766,13 +1799,14 @@ public class HRegion implements HeapSize long addedSize = applyFamilyMapToMemstore(familyMap); flush = isFlushSize(memstoreSize.addAndGet(addedSize)); - - if (coprocessorHost != null) { - coprocessorHost.postPut(familyMap); - } } finally { this.updatesLock.readLock().unlock(); } + + if (coprocessorHost != null) { + coprocessorHost.postPut(familyMap, writeToWAL); + } + if (flush) { // Request a cache flush. Do it outside update lock. requestFlush(); @@ -2350,6 +2384,14 @@ public class HRegion implements HeapSize public synchronized boolean next(List outResults, int limit) throws IOException { + if (coprocessorHost != null) { + Boolean result = coprocessorHost.preScannerNext((InternalScanner)this, + outResults, limit); + if (result != null) { + return result.booleanValue(); + } + } + if (this.filterClosed) { throw new UnknownScannerException("Scanner was closed (timed out?) " + "after we renewed it. Could be caused by a very slow scanner " + @@ -2363,14 +2405,11 @@ public class HRegion implements HeapSize results.clear(); - if (coprocessorHost != null) { - coprocessorHost.preScannerNext(hashCode()); - } - boolean returnResult = nextInternal(limit); if (coprocessorHost != null) { - results = coprocessorHost.postScannerNext(hashCode(), results); + returnResult = coprocessorHost.postScannerNext((InternalScanner)this, + results, limit, returnResult); } outResults.addAll(results); @@ -2416,8 +2455,10 @@ public class HRegion implements HeapSize do { this.storeHeap.next(results, limit - results.size()); if (limit > 0 && results.size() == limit) { - if (this.filter != null && filter.hasFilterRow()) throw new IncompatibleFilterException( + if (this.filter != null && filter.hasFilterRow()) { + throw new IncompatibleFilterException( "Filter with filterRow(List) incompatible with scan with limit!"); + } return true; // we are expecting more yes, but also limited to how many we can return. } } while (Bytes.equals(currentRow, nextRow = peekRow())); @@ -2480,7 +2521,9 @@ public class HRegion implements HeapSize public synchronized void close() throws IOException { if (coprocessorHost != null) { - coprocessorHost.preScannerClose(hashCode()); + if (coprocessorHost.preScannerClose((InternalScanner)this)) { + return; + } } if (storeHeap != null) { storeHeap.close(); @@ -2488,7 +2531,7 @@ public class HRegion implements HeapSize } this.filterClosed = true; if (coprocessorHost != null) { - coprocessorHost.postScannerClose(hashCode()); + coprocessorHost.postScannerClose((InternalScanner)this); } } } @@ -3064,32 +3107,27 @@ public class HRegion implements HeapSize throws IOException { Scan scan = new Scan(get); - List results = null; - List getResults = new ArrayList(); + List results = new ArrayList(); // pre-get CP hook if (withCoprocessor && (coprocessorHost != null)) { - get = coprocessorHost.preGet(get); + if (coprocessorHost.preGet(get, results)) { + return results; + } } InternalScanner scanner = null; try { scanner = getScanner(scan); - scanner.next(getResults); + scanner.next(results); } finally { if (scanner != null) scanner.close(); } - // append get results to pre-get results - if (results != null){ - results.addAll(getResults); - } - else { - results = getResults; - } + // post-get CP hook if (withCoprocessor && (coprocessorHost != null)) { - results = coprocessorHost.postGet(get, results); + coprocessorHost.postGet(get, results); } return results; Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1049471&r1=1049470&r2=1049471&view=diff ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Wed Dec 15 10:11:59 2010 @@ -1599,7 +1599,10 @@ public class HRegionServer implements HR try { HRegion region = getRegion(regionName); if (region.getCoprocessorHost() != null) { - region.getCoprocessorHost().preExists(get); + Boolean result = region.getCoprocessorHost().preExists(get); + if (result != null) { + return result.booleanValue(); + } } Result r = region.get(get, getLockFromId(get.getLockId())); boolean result = r != null && !r.isEmpty(); @@ -1702,8 +1705,11 @@ public class HRegionServer implements HR } HRegion region = getRegion(regionName); if (region.getCoprocessorHost() != null) { - region.getCoprocessorHost().preCheckAndPut(row, family, qualifier, - value, put); + Boolean result = region.getCoprocessorHost() + .preCheckAndPut(row, family, qualifier, value, put); + if (result != null) { + return result.booleanValue(); + } } boolean result = checkAndMutate(regionName, row, family, qualifier, value, put, getLockFromId(put.getLockId())); @@ -1737,8 +1743,11 @@ public class HRegionServer implements HR } HRegion region = getRegion(regionName); if (region.getCoprocessorHost() != null) { - region.getCoprocessorHost().preCheckAndDelete(row, family, qualifier, - value, delete); + Boolean result = region.getCoprocessorHost().preCheckAndDelete(row, + family, qualifier, value, delete); + if (result != null) { + return result.booleanValue(); + } } boolean result = checkAndMutate(regionName, row, family, qualifier, value, delete, getLockFromId(delete.getLockId())); @@ -2462,12 +2471,15 @@ public class HRegionServer implements HR Increment incVal = increment; Result resVal; if (region.getCoprocessorHost() != null) { - incVal = region.getCoprocessorHost().preIncrement(incVal); + resVal = region.getCoprocessorHost().preIncrement(incVal); + if (resVal != null) { + return resVal; + } } resVal = region.increment(incVal, getLockFromId(increment.getLockId()), increment.getWriteToWAL()); if (region.getCoprocessorHost() != null) { - resVal = region.getCoprocessorHost().postIncrement(incVal, resVal); + region.getCoprocessorHost().postIncrement(incVal, resVal); } return resVal; } catch (IOException e) { @@ -2489,16 +2501,18 @@ public class HRegionServer implements HR requestCount.incrementAndGet(); try { HRegion region = getRegion(regionName); - long amountVal = amount; if (region.getCoprocessorHost() != null) { - amountVal = region.getCoprocessorHost().preIncrementColumnValue(row, - family, qualifier, amountVal, writeToWAL); + Long amountVal = region.getCoprocessorHost().preIncrementColumnValue(row, + family, qualifier, amount, writeToWAL); + if (amountVal != null) { + return amountVal.longValue(); + } } long retval = region.incrementColumnValue(row, family, qualifier, amount, - writeToWAL); + writeToWAL); if (region.getCoprocessorHost() != null) { retval = region.getCoprocessorHost().postIncrementColumnValue(row, - family, qualifier, amountVal, writeToWAL, retval); + family, qualifier, amount, writeToWAL, retval); } return retval; } catch (IOException e) { Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java?rev=1049471&r1=1049470&r2=1049471&view=diff ============================================================================== --- hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java (original) +++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java Wed Dec 15 10:11:59 2010 @@ -34,11 +34,8 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; -import static org.junit.Assert.*; - /** * A sample region observer that tests the RegionObserver interface. * It works with TestRegionObserverInterface to provide the test case. @@ -59,22 +56,26 @@ public class SimpleRegionObserver extend boolean hadPreIncrement = false; boolean hadPostIncrement = false; - - // Overriden RegionObserver methods @Override - public Get preGet(CoprocessorEnvironment e, Get get) { + public void preGet(final CoprocessorEnvironment e, final Get get, + final List results) throws IOException { + assertNotNull(e); + assertNotNull(e.getRegion()); + assertNotNull(get); + assertNotNull(results); if (Arrays.equals(e.getRegion().getTableDesc().getName(), TestRegionObserverInterface.TEST_TABLE)) { hadPreGet = true; - assertNotNull(e); - assertNotNull(e.getRegion()); } - return get; } @Override - public List postGet(CoprocessorEnvironment e, Get get, - List results) { + public void postGet(final CoprocessorEnvironment e, final Get get, + final List results) { + assertNotNull(e); + assertNotNull(e.getRegion()); + assertNotNull(get); + assertNotNull(results); if (Arrays.equals(e.getRegion().getTableDesc().getName(), TestRegionObserverInterface.TEST_TABLE)) { boolean foundA = false; @@ -96,12 +97,14 @@ public class SimpleRegionObserver extend assertTrue(foundC); hadPostGet = true; } - return results; } @Override - public Map> prePut(CoprocessorEnvironment e, - Map> familyMap) { + public void prePut(final CoprocessorEnvironment e, final Map> familyMap, final boolean writeToWAL) throws IOException { + assertNotNull(e); + assertNotNull(e.getRegion()); + assertNotNull(familyMap); if (Arrays.equals(e.getRegion().getTableDesc().getName(), TestRegionObserverInterface.TEST_TABLE)) { List kvs = familyMap.get(TestRegionObserverInterface.A); @@ -121,12 +124,14 @@ public class SimpleRegionObserver extend TestRegionObserverInterface.C)); hadPrePut = true; } - return familyMap; } @Override - public void postPut(CoprocessorEnvironment e, - Map> familyMap) { + public void postPut(final CoprocessorEnvironment e, final Map> familyMap, final boolean writeToWAL) throws IOException { + assertNotNull(e); + assertNotNull(e.getRegion()); + assertNotNull(familyMap); List kvs = familyMap.get(TestRegionObserverInterface.A); if (Arrays.equals(e.getRegion().getTableDesc().getName(), TestRegionObserverInterface.TEST_TABLE)) { @@ -149,18 +154,23 @@ public class SimpleRegionObserver extend } @Override - public Map> preDelete(CoprocessorEnvironment e, - Map> familyMap) { + public void preDelete(final CoprocessorEnvironment e, final Map> familyMap, final boolean writeToWAL) throws IOException { + assertNotNull(e); + assertNotNull(e.getRegion()); + assertNotNull(familyMap); if (beforeDelete && e.getRegion().getTableDesc().getName().equals( TestRegionObserverInterface.TEST_TABLE)) { hadPreDeleted = true; } - return familyMap; } @Override - public void postDelete(CoprocessorEnvironment e, - Map> familyMap) { + public void postDelete(final CoprocessorEnvironment e, final Map> familyMap, final boolean writeToWAL) throws IOException { + assertNotNull(e); + assertNotNull(e.getRegion()); + assertNotNull(familyMap); if (Arrays.equals(e.getRegion().getTableDesc().getName(), TestRegionObserverInterface.TEST_TABLE)) { beforeDelete = false; @@ -170,7 +180,12 @@ public class SimpleRegionObserver extend @Override public void preGetClosestRowBefore(final CoprocessorEnvironment e, - final byte[] row, final byte[] family) { + final byte[] row, final byte[] family, final Result result) + throws IOException { + assertNotNull(e); + assertNotNull(e.getRegion()); + assertNotNull(row); + assertNotNull(result); if (beforeDelete && e.getRegion().getTableDesc().getName().equals( TestRegionObserverInterface.TEST_TABLE)) { hadPreGetClosestRowBefore = true; @@ -178,70 +193,35 @@ public class SimpleRegionObserver extend } @Override - public Result postGetClosestRowBefore(final CoprocessorEnvironment e, - final byte[] row, final byte[] family, Result result) { + public void postGetClosestRowBefore(final CoprocessorEnvironment e, + final byte[] row, final byte[] family, final Result result) + throws IOException { + assertNotNull(e); + assertNotNull(e.getRegion()); + assertNotNull(row); + assertNotNull(result); if (Arrays.equals(e.getRegion().getTableDesc().getName(), TestRegionObserverInterface.TEST_TABLE)) { hadPostGetClosestRowBefore = true; } - return result; - } - - @Override - public Scan preScannerOpen(CoprocessorEnvironment e, Scan scan) { - // not tested -- need to go through the RS to get here - return scan; - } - - @Override - public void postScannerOpen(CoprocessorEnvironment e, Scan scan, - long scannerId) { - // not tested -- need to go through the RS to get here } @Override - public void preScannerNext(final CoprocessorEnvironment e, - final long scannerId) { - // not tested -- need to go through the RS to get here - } - - @Override - public List postScannerNext(final CoprocessorEnvironment e, - final long scannerId, List results) { - // not tested -- need to go through the RS to get here - return results; - } - - @Override - public void preScannerClose(final CoprocessorEnvironment e, - final long scannerId) { - // not tested -- need to go through the RS to get here - } - - @Override - public void postScannerClose(final CoprocessorEnvironment e, - final long scannerId) { - // not tested -- need to go through the RS to get here - } - - @Override - public Increment preIncrement(CoprocessorEnvironment e, Increment increment) - throws IOException { + public void preIncrement(final CoprocessorEnvironment e, + final Increment increment, final Result result) throws IOException { if (Arrays.equals(e.getRegion().getTableDesc().getName(), TestRegionObserverInterface.TEST_TABLE_2)) { hadPreIncrement = true; } - return increment; } @Override - public Result postIncrement(CoprocessorEnvironment e, Increment increment, - Result result) throws IOException { + public void postIncrement(final CoprocessorEnvironment e, + final Increment increment, final Result result) throws IOException { if (Arrays.equals(e.getRegion().getTableDesc().getName(), TestRegionObserverInterface.TEST_TABLE_2)) { hadPostIncrement = true; } - return result; } boolean hadPreGet() { Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java?rev=1049471&r1=1049470&r2=1049471&view=diff ============================================================================== --- hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java (original) +++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java Wed Dec 15 10:11:59 2010 @@ -46,8 +46,9 @@ public class TestRegionObserverStacking public static class ObserverA extends BaseRegionObserverCoprocessor { long id; @Override - public void postPut(final CoprocessorEnvironment e, - Map> familyMap) { + public void postPut(final CoprocessorEnvironment e, final Map> familyMap, final boolean writeToWAL) + throws IOException { id = System.currentTimeMillis(); try { Thread.sleep(10); @@ -59,8 +60,9 @@ public class TestRegionObserverStacking public static class ObserverB extends BaseRegionObserverCoprocessor { long id; @Override - public void postPut(final CoprocessorEnvironment e, - Map> familyMap) { + public void postPut(final CoprocessorEnvironment e, final Map> familyMap, final boolean writeToWAL) + throws IOException { id = System.currentTimeMillis(); try { Thread.sleep(10); @@ -73,8 +75,9 @@ public class TestRegionObserverStacking long id; @Override - public void postPut(final CoprocessorEnvironment e, - Map> familyMap) { + public void postPut(final CoprocessorEnvironment e, final Map> familyMap, final boolean writeToWAL) + throws IOException { id = System.currentTimeMillis(); try { Thread.sleep(10);