hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ga...@apache.org
Subject svn commit: r1387001 [1/5] - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/client/ main/java/org/apache/hadoop/hbase/client/coprocessor/ main/java/org/apache/hadoop/hbase/coprocessor/ main/java/org/apache/hadoop/hbase/coprocessor/...
Date Tue, 18 Sep 2012 06:32:58 GMT
Author: garyh
Date: Tue Sep 18 06:32:57 2012
New Revision: 1387001

URL: http://svn.apache.org/viewvc?rev=1387001&view=rev
Log:
HBASE-5448  Support dynamic coprocessor endpoints with protobuf based RPC

Added:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorService.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/example/RowCountEndpoint.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/example/generated/
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/example/generated/ExampleProtos.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcCallback.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CoprocessorRpcChannel.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcController.java
    hbase/trunk/hbase-server/src/main/protobuf/Examples.proto
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ProtobufCoprocessorService.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestRowCountEndpoint.java
Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTable.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/Batch.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/Exec.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/ExecResult.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/package-info.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/package-info.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CoprocessorProtocol.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ExecRPCInvoker.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AccessControlProtos.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
    hbase/trunk/hbase-server/src/main/protobuf/AccessControl.proto
    hbase/trunk/hbase-server/src/main/protobuf/Client.proto
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpoint.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1387001&r1=1387000&r2=1387001&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Tue Sep 18 06:32:57 2012
@@ -2140,6 +2140,7 @@ public class HConnectionManager {
      * @param <R> the callable's return type
      * @throws IOException
      */
+    @Deprecated
     public <T extends CoprocessorProtocol,R> void processExecs(
         final Class<T> protocol,
         List<byte[]> rows,

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1387001&r1=1387000&r2=1387001&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTable.java Tue Sep 18 06:32:57 2012
@@ -32,11 +32,17 @@ import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
+import com.google.protobuf.Service;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -56,6 +62,7 @@ import org.apache.hadoop.hbase.client.co
 import org.apache.hadoop.hbase.filter.BinaryComparator;
 import org.apache.hadoop.hbase.io.DataInputInputStream;
 import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
 import org.apache.hadoop.hbase.ipc.ExecRPCInvoker;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
@@ -1318,6 +1325,7 @@ public class HTable implements HTableInt
    * {@inheritDoc}
    */
   @Override
+  @Deprecated
   public <T extends CoprocessorProtocol> T coprocessorProxy(
       Class<T> protocol, byte[] row) {
     return (T)Proxy.newProxyInstance(this.getClass().getClassLoader(),
@@ -1332,7 +1340,15 @@ public class HTable implements HTableInt
   /**
    * {@inheritDoc}
    */
+  public CoprocessorRpcChannel coprocessorService(byte[] row) {
+    return new CoprocessorRpcChannel(connection, tableName, row);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
   @Override
+  @Deprecated
   public <T extends CoprocessorProtocol, R> Map<byte[],R> coprocessorExec(
       Class<T> protocol, byte[] startKey, byte[] endKey,
       Batch.Call<T,R> callable)
@@ -1353,6 +1369,7 @@ public class HTable implements HTableInt
    * {@inheritDoc}
    */
   @Override
+  @Deprecated
   public <T extends CoprocessorProtocol, R> void coprocessorExec(
       Class<T> protocol, byte[] startKey, byte[] endKey,
       Batch.Call<T,R> callable, Batch.Callback<R> callback)
@@ -1364,6 +1381,75 @@ public class HTable implements HTableInt
         callback);
   }
 
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public <T extends Service, R> Map<byte[],R> coprocessorService(final Class<T> service,
+      byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable)
+      throws ServiceException, Throwable {
+    final Map<byte[],R> results =  new ConcurrentSkipListMap<byte[], R>(Bytes.BYTES_COMPARATOR);
+    coprocessorService(service, startKey, endKey, callable, new Batch.Callback<R>() {
+      public void update(byte[] region, byte[] row, R value) {
+        if (value == null) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Call to " + service.getName() +
+                " received NULL value from Batch.Call for region " + Bytes.toStringBinary(region));
+          }
+        } else {
+          results.put(region, value);
+        }
+      }
+    });
+    return results;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public <T extends Service, R> void coprocessorService(final Class<T> service,
+      byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable,
+      final Batch.Callback<R> callback) throws ServiceException, Throwable {
+
+    // get regions covered by the row range
+    List<byte[]> keys = getStartKeysInRange(startKey, endKey);
+
+    Map<byte[],Future<R>> futures =
+        new TreeMap<byte[],Future<R>>(Bytes.BYTES_COMPARATOR);
+    for (final byte[] r : keys) {
+      final CoprocessorRpcChannel channel =
+          new CoprocessorRpcChannel(connection, tableName, r);
+      Future<R> future = pool.submit(
+          new Callable<R>() {
+            public R call() throws Exception {
+              T instance = ProtobufUtil.newServiceStub(service, channel);
+              R result = callable.call(instance);
+              byte[] region = channel.getLastRegion();
+              if (callback != null) {
+                callback.update(region, r, result);
+              }
+              return result;
+            }
+          });
+      futures.put(r, future);
+    }
+    for (Map.Entry<byte[],Future<R>> e : futures.entrySet()) {
+      try {
+        e.getValue().get();
+      } catch (ExecutionException ee) {
+        LOG.warn("Error calling coprocessor service " + service.getName() + " for row "
+            + Bytes.toStringBinary(e.getKey()), ee);
+        throw ee.getCause();
+      } catch (InterruptedException ie) {
+        Thread.currentThread().interrupt();
+        throw new InterruptedIOException("Interrupted calling coprocessor service " + service.getName()
+            + " for row " + Bytes.toStringBinary(e.getKey()))
+            .initCause(ie);
+      }
+    }
+  }
+
   private List<byte[]> getStartKeysInRange(byte[] start, byte[] end)
   throws IOException {
     Pair<byte[][],byte[][]> startEndKeys = getStartEndKeys();

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java?rev=1387001&r1=1387000&r2=1387001&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java Tue Sep 18 06:32:57 2012
@@ -23,6 +23,8 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 
+import com.google.protobuf.Service;
+import com.google.protobuf.ServiceException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
@@ -30,6 +32,7 @@ import org.apache.hadoop.hbase.HTableDes
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
 
 /**
  * Used to communicate with a single HBase table.
@@ -425,7 +428,9 @@ public interface HTableInterface extends
    * @param protocol The class or interface defining the remote protocol
    * @param row The row key used to identify the remote region location
    * @return A CoprocessorProtocol instance
+   * @deprecated since 0.96.  Use {@link HTableInterface#coprocessorService(byte[])} instead.
    */
+  @Deprecated
   <T extends CoprocessorProtocol> T coprocessorProxy(Class<T> protocol, byte[] row);
 
   /**
@@ -450,7 +455,11 @@ public interface HTableInterface extends
    * method
    * @return a <code>Map</code> of region names to
    * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call(Object)} return values
+   *
+   * @deprecated since 0.96.  Use
+   * {@link HTableInterface#coprocessorService(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call)} instead.
    */
+  @Deprecated
   <T extends CoprocessorProtocol, R> Map<byte[],R> coprocessorExec(
       Class<T> protocol, byte[] startKey, byte[] endKey, Batch.Call<T,R> callable)
       throws IOException, Throwable;
@@ -486,13 +495,97 @@ public interface HTableInterface extends
    * @param <R> Return type for the
    * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call(Object)}
    * method
+   *
+   * @deprecated since 0.96.
+   * Use {@link HTableInterface#coprocessorService(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call, org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)} instead.
    */
+  @Deprecated
   <T extends CoprocessorProtocol, R> void coprocessorExec(
       Class<T> protocol, byte[] startKey, byte[] endKey,
       Batch.Call<T,R> callable, Batch.Callback<R> callback)
       throws IOException, Throwable;
 
   /**
+   * Creates and returns a {@link com.google.protobuf.RpcChannel} instance connected to the
+   * table region containing the specified row.  The row given does not actually have
+   * to exist.  Whichever region would contain the row based on start and end keys will
+   * be used.  Note that the {@code row} parameter is also not passed to the
+   * coprocessor handler registered for this protocol, unless the {@code row}
+   * is separately passed as an argument in the service request.  The parameter
+   * here is only used to locate the region used to handle the call.
+   *
+   * <p>
+   * The obtained {@link com.google.protobuf.RpcChannel} instance can be used to access a published
+   * coprocessor {@link com.google.protobuf.Service} using standard protobuf service invocations:
+   * </p>
+   *
+   * <div style="background-color: #cccccc; padding: 2px">
+   * <blockquote><pre>
+   * CoprocessorRpcChannel channel = myTable.coprocessorService(rowkey);
+   * MyService.BlockingInterface service = MyService.newBlockingStub(channel);
+   * MyCallRequest request = MyCallRequest.newBuilder()
+   *     ...
+   *     .build();
+   * MyCallResponse response = service.myCall(null, request);
+   * </pre></blockquote></div>
+   *
+   * @param row The row key used to identify the remote region location
+   * @return A CoprocessorRpcChannel instance
+   */
+  CoprocessorRpcChannel coprocessorService(byte[] row);
+
+  /**
+   * Creates an instance of the given {@link com.google.protobuf.Service} subclass for each table
+   * region spanning the range from the {@code startKey} row to {@code endKey} row (inclusive),
+   * and invokes the passed {@link Batch.Call#call(Object)} method with each {@link Service}
+   * instance.
+   *
+   * @param service the protocol buffer {@code Service} implementation to call
+   * @param startKey start region selection with region containing this row.  If {@code null}, the
+   *                 selection will start with the first table region.
+   * @param endKey select regions up to and including the region containing this row.
+   *               If {@code null}, selection will continue through the last table region.
+   * @param callable this instance's {@link Batch.Call#call(Object)} method will be invoked once
+   *                 per table region, using the {@link Service} instance connected to that region.
+   * @param <T> the {@link Service} subclass to connect to
+   * @param <R> Return type for the {@code callable} parameter's
+   * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call(Object)} method
+   * @return a map of result values keyed by region name
+   */
+  <T extends Service, R> Map<byte[],R> coprocessorService(final Class<T> service,
+      byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable)
+      throws ServiceException, Throwable;
+
+  /**
+   * Creates an instance of the given {@link com.google.protobuf.Service} subclass for each table
+   * region spanning the range from the {@code startKey} row to {@code endKey} row (inclusive),
+   * and invokes the passed {@link Batch.Call#call(Object)} method with each {@link Service}
+   * instance.
+   *
+   * <p>
+   * The given
+   * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Callback#update(byte[], byte[], Object)}
+   * method will be called with the return value from each region's {@link Batch.Call#call(Object)}
+   * invocation.
+   *</p>
+   *
+   * @param service the protocol buffer {@code Service} implementation to call
+   * @param startKey start region selection with region containing this row.  If {@code null}, the
+   *                 selection will start with the first table region.
+   * @param endKey select regions up to and including the region containing this row.
+   *               If {@code null}, selection will continue through the last table region.
+   * @param callable this instance's {@link Batch.Call#call(Object)} method will be invoked once
+   *                 per table region, using the {@link Service} instance connected to that region.
+   * @param callback
+   * @param <T> the {@link Service} subclass to connect to
+   * @param <R> Return type for the {@code callable} parameter's
+   * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call(Object)} method
+   */
+  <T extends Service, R> void coprocessorService(final Class<T> service,
+      byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable,
+      final Batch.Callback<R> callback) throws ServiceException, Throwable;
+
+  /**
    * See {@link #setAutoFlush(boolean, boolean)}
    *
    * @param autoFlush

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java?rev=1387001&r1=1387000&r2=1387001&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java Tue Sep 18 06:32:57 2012
@@ -24,6 +24,8 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
+import com.google.protobuf.Service;
+import com.google.protobuf.ServiceException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
@@ -32,6 +34,7 @@ import org.apache.hadoop.hbase.HTableDes
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
 import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.PoolMap;
 import org.apache.hadoop.hbase.util.PoolMap.PoolType;
@@ -493,6 +496,25 @@ public class HTablePool implements Close
     }
 
     @Override
+    public CoprocessorRpcChannel coprocessorService(byte[] row) {
+      return table.coprocessorService(row);
+    }
+
+    @Override
+    public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service,
+        byte[] startKey, byte[] endKey, Batch.Call<T, R> callable)
+        throws ServiceException, Throwable {
+      return table.coprocessorService(service, startKey, endKey, callable);
+    }
+
+    @Override
+    public <T extends Service, R> void coprocessorService(Class<T> service,
+        byte[] startKey, byte[] endKey, Batch.Call<T, R> callable, Callback<R> callback)
+        throws ServiceException, Throwable {
+      table.coprocessorService(service, startKey, endKey, callable, callback);
+    }
+
+    @Override
     public String toString() {
       return "PooledHTable{" + ", table=" + table + '}';
     }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/Batch.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/Batch.java?rev=1387001&r1=1387000&r2=1387001&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/Batch.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/Batch.java Tue Sep 18 06:32:57 2012
@@ -67,6 +67,7 @@ public abstract class Batch {
    * @see Batch#forMethod(java.lang.reflect.Method, Object...)
    * @see org.apache.hadoop.hbase.client.HTable#coprocessorExec(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call, org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)
    */
+  @Deprecated
   public static <T extends CoprocessorProtocol,R> Call<T,R> forMethod(
       final Class<T> protocol, final String method, final Object... args)
   throws NoSuchMethodException {
@@ -100,6 +101,7 @@ public abstract class Batch {
    * return the results
    * @see org.apache.hadoop.hbase.client.HTable#coprocessorExec(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call, org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)
    */
+  @Deprecated
   public static <T extends CoprocessorProtocol,R> Call<T,R> forMethod(
       final Method method, final Object... args) {
     return new Call<T,R>() {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/Exec.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/Exec.java?rev=1387001&r1=1387000&r2=1387001&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/Exec.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/Exec.java Tue Sep 18 06:32:57 2012
@@ -51,7 +51,10 @@ import java.lang.reflect.Method;
  * @see ExecResult
  * @see org.apache.hadoop.hbase.client.HTable#coprocessorExec(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call)
  * @see org.apache.hadoop.hbase.client.HTable#coprocessorExec(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call, org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)
+ * @deprecated since 0.96.0.  See {@link org.apache.hadoop.hbase.client.HTable#coprocessorService(byte[])}
+ * or related methods instead.
  */
+@Deprecated
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public class Exec extends Invocation implements Row {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/ExecResult.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/ExecResult.java?rev=1387001&r1=1387000&r2=1387001&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/ExecResult.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/ExecResult.java Tue Sep 18 06:32:57 2012
@@ -46,7 +46,10 @@ import java.io.Serializable;
  * @see Exec
  * @see org.apache.hadoop.hbase.client.HTable#coprocessorExec(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call)
  * @see org.apache.hadoop.hbase.client.HTable#coprocessorExec(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call, org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)
+ * @deprecated since 0.96.0.  See {@link org.apache.hadoop.hbase.client.HTable#coprocessorService(byte[])}
+ * or related methods instead.
  */
+@Deprecated
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public class ExecResult implements Writable {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/package-info.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/package-info.java?rev=1387001&r1=1387000&r2=1387001&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/package-info.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/package-info.java Tue Sep 18 06:32:57 2012
@@ -37,30 +37,42 @@ protocols.
 
 <p>
 In order to provide a custom RPC protocol to clients, a coprocessor implementation
-defines an interface that extends {@link org.apache.hadoop.hbase.ipc.CoprocessorProtocol}.
-The interface can define any methods that the coprocessor wishes to expose.
-Using this protocol, you can communicate with the coprocessor instances via
-the {@link org.apache.hadoop.hbase.client.HTable#coprocessorProxy(Class, byte[])} and
-{@link org.apache.hadoop.hbase.client.HTable#coprocessorExec(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call, org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)}
+must:
+<ul>
+ <li>Define a protocol buffer Service and supporting Message types for the RPC methods.
+ See the
+ <a href="https://developers.google.com/protocol-buffers/docs/proto#services">protocol buffer guide</a>
+ for more details on defining services.</li>
+ <li>Generate the Service and Message code using the protoc compiler</li>
+ <li>Implement the generated Service interface in your coprocessor class and implement the
+ {@link org.apache.hadoop.hbase.coprocessor.CoprocessorService} interface.  The
+ {@link org.apache.hadoop.hbase.coprocessor.CoprocessorService#getService()}
+ method should return a reference to the Endpoint's protocol buffer Service instance.
+</ul>
+Clients may then call the defined service methods on coprocessor instances via
+the {@link org.apache.hadoop.hbase.client.HTable#coprocessorService(byte[])},
+{@link org.apache.hadoop.hbase.client.HTable#coprocessorService(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call)}, and
+{@link org.apache.hadoop.hbase.client.HTable#coprocessorService(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call, org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)}
 methods.
 </p>
 
 <p>
-Since {@link org.apache.hadoop.hbase.ipc.CoprocessorProtocol} instances are
-associated with individual regions within the table, the client RPC calls
-must ultimately identify which regions should be used in the <code>CoprocessorProtocol</code>
+Since coprocessor Service instances are associated with individual regions within the table,
+the client RPC calls must ultimately identify which regions should be used in the Service
 method invocations.  Since regions are seldom handled directly in client code
 and the region names may change over time, the coprocessor RPC calls use row keys
 to identify which regions should be used for the method invocations.  Clients
-can call <code>CoprocessorProtocol</code> methods against either:
+can call coprocessor Service methods against either:
 <ul>
  <li><strong>a single region</strong> - calling
-   {@link org.apache.hadoop.hbase.client.HTable#coprocessorProxy(Class, byte[])}
-   with a single row key.  This returns a dynamic proxy of the <code>CoprocessorProtocol</code>
-   interface which uses the region containing the given row key (even if the
-   row does not exist) as the RPC endpoint.</li>
+   {@link org.apache.hadoop.hbase.client.HTable#coprocessorService(byte[])}
+   with a single row key.  This returns a {@link org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel}
+   instance which communicates with the region containing the given row key (even if the
+   row does not exist) as the RPC endpoint.  Clients can then use the {@code CoprocessorRpcChannel}
+   instance in creating a new Service stub to call RPC methods on the region's coprocessor.</li>
  <li><strong>a range of regions</strong> - calling
-   {@link org.apache.hadoop.hbase.client.HTable#coprocessorExec(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call, org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)}
+   {@link org.apache.hadoop.hbase.client.HTable#coprocessorService(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call)}
+   or {@link org.apache.hadoop.hbase.client.HTable#coprocessorService(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call, org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)}
    with a starting row key and an ending row key.  All regions in the table
    from the region containing the start row key to the region containing the end
    row key (inclusive), will we used as the RPC endpoints.</li>
@@ -68,17 +80,16 @@ can call <code>CoprocessorProtocol</code
 </p>
 
 <p><em>Note that the row keys passed as parameters to the <code>HTable</code>
-methods are not passed to the <code>CoprocessorProtocol</code> implementations.
+methods are not passed directly to the coprocessor Service implementations.
 They are only used to identify the regions for endpoints of the remote calls.
 </em></p>
 
 <p>
 The {@link org.apache.hadoop.hbase.client.coprocessor.Batch} class defines two
-interfaces used for <code>CoprocessorProtocol</code> invocations against
-multiple regions.  Clients implement {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call} to
-call methods of the actual <code>CoprocessorProtocol</code> instance.  The interface's
-<code>call()</code> method will be called once per selected region, passing the
-<code>CoprocessorProtocol</code> instance for the region as a parameter.  Clients
+interfaces used for coprocessor Service invocations against multiple regions.  Clients implement
+{@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call} to call methods of the actual
+coprocessor Service instance.  The interface's <code>call()</code> method will be called once
+per selected region, passing the Service instance for the region as a parameter.  Clients
 can optionally implement {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Callback}
 to be notified of the results from each region invocation as they complete.
 The instance's {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Callback#update(byte[], byte[], Object)}
@@ -88,112 +99,128 @@ return value from each region.
 
 <h2><a name="usage">Example usage</a></h2>
 <p>
-To start with, let's use a fictitious coprocessor, <code>RowCountCoprocessor</code>
+To start with, let's use a fictitious coprocessor, <code>RowCountEndpoint</code>
 that counts the number of rows and key-values in each region where it is running.
-For clients to query this information, the coprocessor defines and implements
-the following {@link org.apache.hadoop.hbase.ipc.CoprocessorProtocol} extension
-interface:
+For clients to query this information, the coprocessor defines the following protocol buffer
+service:
 </p>
 
 <div style="background-color: #cccccc; padding: 2px">
 <blockquote><pre>
-public interface RowCountProtocol extends CoprocessorProtocol {
-  long getRowCount();
-  long getRowCount(Filter filt);
-  long getKeyValueCount();
+message CountRequest {
+}
+
+message CountResponse {
+  required int64 count = 1 [default = 0];
+}
+
+service RowCountService {
+  rpc getRowCount(CountRequest)
+    returns (CountResponse);
+  rpc getKeyValueCount(CountRequest)
+    returns (CountResponse);
 }
 </pre></blockquote></div>
 
 <p>
-Now we need a way to access the results that <code>RowCountCoprocessor</code>
-is making available.  If we want to find the row count for all regions, we could
-use:
+Next run the protoc compiler on the .proto file to generate Java code for the Service interface.
+The generated {@code RowCountService} interface should look something like:
 </p>
-
 <div style="background-color: #cccccc; padding: 2px">
 <blockquote><pre>
-HTable table = new HTable("mytable");
-// find row count keyed by region name
-Map<byte[],Long> results = table.coprocessorExec(
-    RowCountProtocol.class, // the protocol interface we're invoking
-    null, null,             // start and end row keys
-    new Batch.Call<RowCountProtocol,Long>() {
-       public Long call(RowCountProtocol counter) {
-         return counter.getRowCount();
-       }
-     });
+public static abstract class RowCountService
+  implements com.google.protobuf.Service {
+  ...
+  public interface Interface {
+    public abstract void getRowCount(
+        com.google.protobuf.RpcController controller,
+        org.apache.hadoop.hbase.coprocessor.example.generated.ExampleProtos.CountRequest request,
+        com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.coprocessor.example.generated.ExampleProtos.CountResponse> done);
+
+    public abstract void getKeyValueCount(
+        com.google.protobuf.RpcController controller,
+        org.apache.hadoop.hbase.coprocessor.example.generated.ExampleProtos.CountRequest request,
+        com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.coprocessor.example.generated.ExampleProtos.CountResponse> done);
+  }
+}
 </pre></blockquote></div>
 
 <p>
-This will return a <code>java.util.Map</code> of the <code>counter.getRowCount()</code>
-result for the <code>RowCountCoprocessor</code> instance running in each region
-of <code>mytable</code>, keyed by the region name.
-</p>
-
-<p>
-By implementing {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call}
-as an anonymous class, we can invoke <code>RowCountProtocol</code> methods
-directly against the {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call(Object)}
-method's argument.  Calling {@link org.apache.hadoop.hbase.client.HTable#coprocessorExec(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call)}
-will take care of invoking <code>Batch.Call.call()</code> against our anonymous class
-with the <code>RowCountCoprocessor</code> instance for each table region.
+Our coprocessor Service will need to implement this interface and the {@link org.apache.hadoop.hbase.coprocessor.CoprocessorService}
+in order to be registered correctly as an endpoint.  For the sake of simplicity the server-side
+implementation is omitted.  To see the implementing code, please see the
+{@link org.apache.hadoop.hbase.coprocessor.example.RowCountEndpoint} class in the HBase source code.
 </p>
 
 <p>
-For this simple case, where we only want to obtain the result from a single
-<code>CoprocessorProtocol</code> method, there's also a bit of syntactic sugar
-we can use to cut down on the amount of code required:
+Now we need a way to access the results that <code>RowCountService</code>
+is making available.  If we want to find the row count for all regions, we could
+use:
 </p>
 
 <div style="background-color: #cccccc; padding: 2px">
 <blockquote><pre>
-HTable table = new HTable("mytable");
-Batch.Call<RowCountProtocol,Long> call = Batch.forMethod(RowCountProtocol.class, "getRowCount");
-Map<byte[],Long> results = table.coprocessorExec(RowCountProtocol.class, null, null, call);
+HTable table = new HTable(conf, "mytable");
+final ExampleProtos.CountRequest request = ExampleProtos.CountRequest.getDefaultInstance();
+Map<byte[],Long> results = table.coprocessorService(
+    ExampleProtos.RowCountService.class, // the protocol interface we're invoking
+    null, null,                          // start and end row keys
+    new Batch.Call<ExampleProtos.RowCountService,Long>() {
+        public Long call(ExampleProtos.RowCountService counter) throws IOException {
+          BlockingRpcCallback<ExampleProtos.CountResponse> rpcCallback =
+              new BlockingRpcCallback<ExampleProtos.CountResponse>();
+          counter.getRowCount(null, request, rpcCallback);
+          ExampleProtos.CountResponse response = rpcCallback.get();
+          return response.hasCount() ? response.getCount() : 0;
+        }
+    });
 </pre></blockquote></div>
 
 <p>
-{@link org.apache.hadoop.hbase.client.coprocessor.Batch#forMethod(Class, String, Object...)}
-is a simple factory method that will return a {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call}
-instance that will call <code>RowCountProtocol.getRowCount()</code> for us
-using reflection.
+This will return a <code>java.util.Map</code> of the <code>counter.getRowCount()</code>
+result for the <code>RowCountService</code> instance running in each region
+of <code>mytable</code>, keyed by the region name.
 </p>
 
 <p>
-However, if you want to perform additional processing on the results,
-implementing {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call}
-directly will provide more power and flexibility.  For example, if you would
-like to combine row count and key-value count for each region:
+By implementing {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call}
+as an anonymous class, we can invoke <code>RowCountService</code> methods
+directly against the {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call(Object)}
+method's argument.  Calling {@link org.apache.hadoop.hbase.client.HTable#coprocessorService(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call)}
+will take care of invoking <code>Batch.Call.call()</code> against our anonymous class
+with the <code>RowCountService</code> instance for each table region.
 </p>
 
-<div style="background-color: #cccccc; padding: 2px">
-<blockquote><pre>
-HTable table = new HTable("mytable");
-// combine row count and kv count for region
-Map<byte[],Pair<Long,Long>> results = table.coprocessorExec(
-    RowCountProtocol.class,
-    null, null,
-    new Batch.Call<RowCountProtocol,Pair<Long,Long>>() {
-        public Pair<Long,Long> call(RowCountProtocol counter) {
-          return new Pair(counter.getRowCount(), counter.getKeyValueCount());
-        }
-    });
-</pre></blockquote></div>
-
 <p>
-Similarly, you could average the number of key-values per row for each region:
+Implementing {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call} also allows you to
+perform additional processing against each region's Service instance.  For example, if you would
+like to combine row count and key-value count for each region:
 </p>
 
 <div style="background-color: #cccccc; padding: 2px">
 <blockquote><pre>
-Map<byte[],Double> results = table.coprocessorExec(
-    RowCountProtocol.class,
-    null, null,
-    new Batch.Call<RowCountProtocol,Double>() {
-        public Double call(RowCountProtocol counter) {
-          return ((double)counter.getKeyValueCount()) / ((double)counter.getRowCount());
-        }
-    });
+HTable table = new HTable(conf, "mytable");
+// combine row count and kv count for region
+final ExampleProtos.CountRequest request = ExampleProtos.CountRequest.getDefaultInstance();
+Map<byte[],Long> results = table.coprocessorService(
+    ExampleProtos.RowCountService.class, // the protocol interface we're invoking
+    null, null,                          // start and end row keys
+    new Batch.Call<ExampleProtos.RowCountService,Pair<Long,Long>>() {
+       public Long call(ExampleProtos.RowCountService counter) throws IOException {
+         BlockingRpcCallback<ExampleProtos.CountResponse> rowCallback =
+             new BlockingRpcCallback<ExampleProtos.CountResponse>();
+         counter.getRowCount(null, request, rowCallback);
+
+         BlockingRpcCallback<ExampleProtos.CountResponse> kvCallback =
+             new BlockingRpcCallback<ExampleProtos.CountResponse>();
+         counter.getKeyValueCount(null, request, kvCallback);
+
+         ExampleProtos.CountResponse rowResponse = rowCallback.get();
+         ExampleProtos.CountResponse kvResponse = kvCallback.get();
+         return new Pair(rowResponse.hasCount() ? rowResponse.getCount() : 0,
+             kvResponse.hasCount() ? kvResponse.getCount() : 0);
+    }
+});
 </pre></blockquote></div>
 */
 package org.apache.hadoop.hbase.client.coprocessor;

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java?rev=1387001&r1=1387000&r2=1387001&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java Tue Sep 18 06:32:57 2012
@@ -19,6 +19,8 @@
 
 package org.apache.hadoop.hbase.coprocessor;
 
+import com.google.protobuf.Service;
+import com.google.protobuf.ServiceException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -34,6 +36,7 @@ import org.apache.hadoop.hbase.HTableDes
 import org.apache.hadoop.hbase.client.*;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.SortedCopyOnWriteSet;
 import org.apache.hadoop.hbase.util.VersionInfo;
@@ -250,6 +253,11 @@ public abstract class CoprocessorHost<E 
    */
   public E loadInstance(Class<?> implClass, int priority, Configuration conf)
       throws IOException {
+    if (!Coprocessor.class.isAssignableFrom(implClass)) {
+      throw new IOException("Configured class " + implClass.getName() + " must implement "
+          + Coprocessor.class.getName() + " interface ");
+    }
+
     // create the instance
     Coprocessor impl;
     Object o = null;
@@ -435,7 +443,7 @@ public abstract class CoprocessorHost<E 
           byte[] qualifier, long amount, boolean writeToWAL)
           throws IOException {
         return table.incrementColumnValue(row, family, qualifier, amount,
-          writeToWAL);
+            writeToWAL);
       }
 
       @Override
@@ -537,6 +545,25 @@ public abstract class CoprocessorHost<E 
       }
 
       @Override
+      public CoprocessorRpcChannel coprocessorService(byte[] row) {
+        return table.coprocessorService(row);
+      }
+
+      @Override
+      public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service,
+          byte[] startKey, byte[] endKey, Batch.Call<T, R> callable)
+          throws ServiceException, Throwable {
+        return table.coprocessorService(service, startKey, endKey, callable);
+      }
+
+      @Override
+      public <T extends Service, R> void coprocessorService(Class<T> service,
+          byte[] startKey, byte[] endKey, Batch.Call<T, R> callable, Batch.Callback<R> callback)
+          throws ServiceException, Throwable {
+        table.coprocessorService(service, startKey, endKey, callable, callback);
+      }
+
+      @Override
       public void mutateRow(RowMutations rm) throws IOException {
         table.mutateRow(rm);
       }

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorService.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorService.java?rev=1387001&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorService.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorService.java Tue Sep 18 06:32:57 2012
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.coprocessor;
+
+import com.google.protobuf.Service;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Coprocessor endpoints providing protobuf services should implement this
+ * interface and return the {@link Service} instance via {@link #getService()}.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface CoprocessorService {
+  public Service getService();
+}

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/example/RowCountEndpoint.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/example/RowCountEndpoint.java?rev=1387001&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/example/RowCountEndpoint.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/example/RowCountEndpoint.java Tue Sep 18 06:32:57 2012
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.coprocessor.example;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.example.generated.ExampleProtos;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.apache.hadoop.hbase.protobuf.ResponseConverter;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Sample coprocessor endpoint exposing a Service interface for counting rows and key values.
+ *
+ * <p>
+ * For the protocol buffer definition of the RowCountService, see the source file located under
+ * hbase-server/src/main/protobuf/Examples.proto.
+ * </p>
+ */
+public class RowCountEndpoint extends ExampleProtos.RowCountService
+    implements Coprocessor, CoprocessorService {
+  private RegionCoprocessorEnvironment env;
+
+  public RowCountEndpoint() {
+  }
+
+  /**
+   * Just returns a reference to this object, which implements the RowCounterService interface.
+   */
+  @Override
+  public Service getService() {
+    return this;
+  }
+
+  /**
+   * Returns a count of the rows in the region where this coprocessor is loaded.
+   */
+  @Override
+  public void getRowCount(RpcController controller, ExampleProtos.CountRequest request,
+                          RpcCallback<ExampleProtos.CountResponse> done) {
+    Scan scan = new Scan();
+    scan.setFilter(new FirstKeyOnlyFilter());
+    ExampleProtos.CountResponse response = null;
+    InternalScanner scanner = null;
+    try {
+      scanner = env.getRegion().getScanner(scan);
+      List<KeyValue> results = new ArrayList<KeyValue>();
+      boolean hasMore = false;
+      byte[] lastRow = null;
+      long count = 0;
+      do {
+        hasMore = scanner.next(results);
+        for (KeyValue kv : results) {
+          byte[] currentRow = kv.getRow();
+          if (lastRow == null || !Bytes.equals(lastRow, currentRow)) {
+            lastRow = currentRow;
+            count++;
+          }
+        }
+        results.clear();
+      } while (hasMore);
+
+      response = ExampleProtos.CountResponse.newBuilder()
+          .setCount(count).build();
+    } catch (IOException ioe) {
+      ResponseConverter.setControllerException(controller, ioe);
+    } finally {
+      if (scanner != null) {
+        try {
+          scanner.close();
+        } catch (IOException ignored) {}
+      }
+    }
+    done.run(response);
+  }
+
+  /**
+   * Returns a count of all KeyValues in the region where this coprocessor is loaded.
+   */
+  @Override
+  public void getKeyValueCount(RpcController controller, ExampleProtos.CountRequest request,
+                               RpcCallback<ExampleProtos.CountResponse> done) {
+    ExampleProtos.CountResponse response = null;
+    InternalScanner scanner = null;
+    try {
+      scanner = env.getRegion().getScanner(new Scan());
+      List<KeyValue> results = new ArrayList<KeyValue>();
+      boolean hasMore = false;
+      long count = 0;
+      do {
+        hasMore = scanner.next(results);
+        for (KeyValue kv : results) {
+          count++;
+        }
+        results.clear();
+      } while (hasMore);
+
+      response = ExampleProtos.CountResponse.newBuilder()
+          .setCount(count).build();
+    } catch (IOException ioe) {
+      ResponseConverter.setControllerException(controller, ioe);
+    } finally {
+      if (scanner != null) {
+        try {
+          scanner.close();
+        } catch (IOException ignored) {}
+      }
+    }
+    done.run(response);
+  }
+
+  /**
+   * Stores a reference to the coprocessor environment provided by the
+   * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost} from the region where this
+   * coprocessor is loaded.  Since this is a coprocessor endpoint, it always expects to be loaded
+   * on a table region, so always expects this to be an instance of
+   * {@link RegionCoprocessorEnvironment}.
+   * @param env the environment provided by the coprocessor host
+   * @throws IOException if the provided environment is not an instance of
+   * {@code RegionCoprocessorEnvironment}
+   */
+  @Override
+  public void start(CoprocessorEnvironment env) throws IOException {
+    if (env instanceof RegionCoprocessorEnvironment) {
+      this.env = (RegionCoprocessorEnvironment)env;
+    } else {
+      throw new CoprocessorException("Must be loaded on a table region!");
+    }
+  }
+
+  @Override
+  public void stop(CoprocessorEnvironment env) throws IOException {
+    // nothing to do
+  }
+}



Mime
View raw message