hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From apurt...@apache.org
Subject svn commit: r1037026 [1/2] - in /hbase/trunk: ./ src/main/java/org/apache/hadoop/hbase/client/ src/main/java/org/apache/hadoop/hbase/client/coprocessor/ src/main/java/org/apache/hadoop/hbase/io/ src/main/java/org/apache/hadoop/hbase/ipc/ src/main/java/...
Date Fri, 19 Nov 2010 21:14:06 GMT
Author: apurtell
Date: Fri Nov 19 21:14:05 2010
New Revision: 1037026

URL: http://svn.apache.org/viewvc?rev=1037026&view=rev
Log:
HBASE-2002 Coprocessors: Client side support

HBASE-2321 Support RPC interface changes at runtime


Added:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/coprocessor/
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/coprocessor/Batch.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/coprocessor/Exec.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/coprocessor/ExecResult.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/coprocessor/package-info.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionHeader.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/CoprocessorProtocol.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ExecRPCInvoker.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/Invocation.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RpcEngine.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestServerCustomProtocol.java
Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Action.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPCProtocolVersion.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
    hbase/trunk/src/main/resources/hbase-default.xml

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1037026&r1=1037025&r2=1037026&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Fri Nov 19 21:14:05 2010
@@ -33,7 +33,9 @@ Release 0.90.0 - Unreleased
    HBASE-2641  HBASE-2641 Refactor HLog splitLog, hbase-2437 continued;
                break out split code as new classes
                (James Kennedy via Stack)
-               
+   HBASE-2002  Coprocessors: Client side support; Support RPC interface
+               changes at runtime (Gary Helmling via Andrew Purtell)
+
 
   BUG FIXES
    HBASE-1791  Timeout in IndexRecordWriter (Bradford Stephens via Andrew

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Action.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Action.java?rev=1037026&r1=1037025&r2=1037026&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Action.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Action.java Fri Nov 19 21:14:05 2010
@@ -32,12 +32,12 @@ import org.apache.hadoop.io.Writable;
  * {@link HTable::batch} to associate the action with it's region and maintain 
  * the index from the original request. 
  */
-public class Action implements Writable, Comparable {
+public class Action<R> implements Writable, Comparable {
 
   private byte[] regionName;
   private Row action;
   private int originalIndex;
-  private Result result;
+  private R result;
 
   public Action() {
     super();
@@ -58,11 +58,11 @@ public class Action implements Writable,
     this.regionName = regionName;
   }
 
-  public Result getResult() {
+  public R getResult() {
     return result;
   }
 
-  public void setResult(Result result) {
+  public void setResult(R result) {
     this.result = result;
   }
 
@@ -87,14 +87,15 @@ public class Action implements Writable,
     Bytes.writeByteArray(out, regionName);
     HbaseObjectWritable.writeObject(out, action, Row.class, null);
     out.writeInt(originalIndex);
-    HbaseObjectWritable.writeObject(out, result, Result.class, null);
+    HbaseObjectWritable.writeObject(out, result,
+        result != null ? result.getClass() : Writable.class, null);
   }
 
   public void readFields(final DataInput in) throws IOException {
     this.regionName = Bytes.readByteArray(in);
     this.action = (Row) HbaseObjectWritable.readObject(in, null);
     this.originalIndex = in.readInt();
-    this.result = (Result) HbaseObjectWritable.readObject(in, null);
+    this.result = (R) HbaseObjectWritable.readObject(in, null);
   }
 
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnection.java?rev=1037026&r1=1037025&r2=1037026&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnection.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnection.java Fri Nov 19 21:14:05 2010
@@ -33,6 +33,8 @@ import org.apache.hadoop.hbase.HTableDes
 import org.apache.hadoop.hbase.MasterNotRunningException;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
 import org.apache.hadoop.hbase.catalog.CatalogTracker;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
 import org.apache.hadoop.hbase.ipc.HMasterInterface;
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@@ -258,6 +260,46 @@ public interface HConnection extends Abo
       throws IOException, InterruptedException;
 
   /**
+   * Parameterized batch processing, allowing varying return types for different
+   * {@link Row} implementations.
+   */
+  public <R> void processBatchCallback(List<? extends Row> list,
+      byte[] tableName,
+      ExecutorService pool,
+      Object[] results,
+      Batch.Callback<R> callback) throws IOException, InterruptedException;
+
+
+  /**
+   * Executes the given
+   * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call}
+   * callable for each row in the given list and invokes
+   * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Callback#update(byte[], byte[], Object)}
+   * for each result returned.
+   *
+   * @param protocol the protocol interface being called
+   * @param rows a list of row keys for which the callable should be invoked
+   * @param tableName table name for the coprocessor invoked
+   * @param pool ExecutorService used to submit the calls per row
+   * @param call instance on which to invoke
+   * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call(Object)}
+   * for each row
+   * @param callback instance on which to invoke
+   * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Callback#update(byte[], byte[], Object)}
+   * for each result
+   * @param <T> the protocol interface type
+   * @param <R> the callable's return type
+   * @throws IOException
+   */
+  public <T extends CoprocessorProtocol,R> void processExecs(
+      final Class<T> protocol,
+      List<byte[]> rows,
+      final byte[] tableName,
+      ExecutorService pool,
+      final Batch.Call<T,R> call,
+      final Batch.Callback<R> callback) throws IOException, Throwable;
+
+  /**
    * Process a batch of Puts.
    *
    * @param list The collection of actions. The list is mutated: all successful Puts
@@ -296,4 +338,4 @@ public interface HConnection extends Abo
    */
   public void prewarmRegionCache(final byte[] tableName,
       final Map<HRegionInfo, HServerAddress> regions);
-}
\ No newline at end of file
+}

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1037026&r1=1037025&r2=1037026&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Fri Nov 19 21:14:05 2010
@@ -20,15 +20,10 @@
 package org.apache.hadoop.hbase.client;
 
 import java.io.IOException;
+import java.lang.reflect.Proxy;
 import java.lang.reflect.UndeclaredThrowableException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeSet;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArraySet;
@@ -53,10 +48,8 @@ import org.apache.hadoop.hbase.RemoteExc
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
 import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
-import org.apache.hadoop.hbase.ipc.HBaseRPC;
-import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;
-import org.apache.hadoop.hbase.ipc.HMasterInterface;
-import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.ipc.*;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.SoftValueSortedMap;
@@ -1051,27 +1044,27 @@ public class HConnectionManager {
       this.closed = true;
     }
 
-    private Callable<MultiResponse> createCallable(
+    private <R> Callable<MultiResponse> createCallable(
         final HServerAddress address,
-        final MultiAction multi,
+        final MultiAction<R> multi,
         final byte [] tableName) {
       final HConnection connection = this;
       return new Callable<MultiResponse>() {
-        public MultiResponse call() throws IOException {
-          return getRegionServerWithoutRetries(
-              new ServerCallable<MultiResponse>(connection, tableName, null) {
-                public MultiResponse call() throws IOException {
-                  return server.multi(multi);
-                }
-                @Override
-                public void instantiateServer(boolean reload) throws IOException {
-                  server = connection.getHRegionConnection(address);
-                }
-              }
-          );
-        }
-      };
-    }
+       public MultiResponse call() throws IOException {
+         return getRegionServerWithoutRetries(
+             new ServerCallable<MultiResponse>(connection, tableName, null) {
+               public MultiResponse call() throws IOException {
+                 return server.multi(multi);
+               }
+               @Override
+               public void instantiateServer(boolean reload) throws IOException {
+                 server = connection.getHRegionConnection(address);
+               }
+             }
+         );
+       }
+     };
+   }
 
     public void processBatch(List<Row> list,
         final byte[] tableName,
@@ -1083,9 +1076,98 @@ public class HConnectionManager {
         throw new IllegalArgumentException("argument results must be the same size as argument list");
       }
 
+      processBatchCallback(list, tableName, pool, results, null);
+    }
+
+    /**
+     * Executes the given
+     * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call}
+     * callable for each row in the
+     * given list and invokes
+     * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Callback#update(byte[], byte[], Object)}
+     * for each result returned.
+     *
+     * @param protocol the protocol interface being called
+     * @param rows a list of row keys for which the callable should be invoked
+     * @param tableName table name for the coprocessor invoked
+     * @param pool ExecutorService used to submit the calls per row
+     * @param callable instance on which to invoke
+     * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call(Object)}
+     * for each row
+     * @param callback instance on which to invoke
+     * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Callback#update(byte[], byte[], Object)}
+     * for each result
+     * @param <T> the protocol interface type
+     * @param <R> the callable's return type
+     * @throws IOException
+     */
+    public <T extends CoprocessorProtocol,R> void processExecs(
+        final Class<T> protocol,
+        List<byte[]> rows,
+        final byte[] tableName,
+        ExecutorService pool,
+        final Batch.Call<T,R> callable,
+        final Batch.Callback<R> callback)
+      throws IOException, Throwable {
+
+      Map<byte[],Future<R>> futures =
+          new TreeMap<byte[],Future<R>>(Bytes.BYTES_COMPARATOR);
+      for (final byte[] r : rows) {
+        final ExecRPCInvoker invoker =
+            new ExecRPCInvoker(conf, this, protocol, tableName, r);
+        Future<R> future = pool.submit(
+            new Callable<R>() {
+              public R call() throws Exception {
+                T instance = (T)Proxy.newProxyInstance(conf.getClassLoader(),
+                    new Class[]{protocol},
+                    invoker);
+                R result = callable.call(instance);
+                byte[] region = invoker.getRegionName();
+                if (callback != null) {
+                  callback.update(region, r, result);
+                }
+                return result;
+              }
+            });
+        futures.put(r, future);
+      }
+      for (Map.Entry<byte[],Future<R>> e : futures.entrySet()) {
+        try {
+          e.getValue().get();
+        } catch (ExecutionException ee) {
+          LOG.warn("Error executing for row "+Bytes.toStringBinary(e.getKey()), ee);
+          throw ee.getCause();
+        } catch (InterruptedException ie) {
+          Thread.currentThread().interrupt();
+          throw new IOException("Interrupted executing for row " +
+              Bytes.toStringBinary(e.getKey()), ie);
+        }
+      }
+    }
+
+    /**
+     * Parameterized batch processing, allowing varying return types for
+     * different {@link Row} implementations.
+     */
+    public <R> void processBatchCallback(
+        List<? extends Row> list,
+        byte[] tableName,
+        ExecutorService pool,
+        Object[] results,
+        Batch.Callback<R> callback)
+    throws IOException, InterruptedException {
+
+      // results must be the same size as list
+      if (results.length != list.size()) {
+        throw new IllegalArgumentException(
+            "argument results must be the same size as argument list");
+      }
       if (list.size() == 0) {
         return;
       }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("expecting "+results.length+" results");
+      }
 
       // Keep track of the most recent servers for any given item for better
       // exceptional reporting.
@@ -1102,10 +1184,9 @@ public class HConnectionManager {
           LOG.debug("Retry " +tries+ ", sleep for " +sleepTime+ "ms!");
           Thread.sleep(sleepTime);
         }
-
         // step 1: break up into regionserver-sized chunks and build the data structs
-
-        Map<HServerAddress, MultiAction> actionsByServer = new HashMap<HServerAddress, MultiAction>();
+        Map<HServerAddress, MultiAction<R>> actionsByServer =
+          new HashMap<HServerAddress, MultiAction<R>>();
         for (int i = 0; i < workingList.size(); i++) {
           Row row = workingList.get(i);
           if (row != null) {
@@ -1113,13 +1194,13 @@ public class HConnectionManager {
             HServerAddress address = loc.getServerAddress();
             byte[] regionName = loc.getRegionInfo().getRegionName();
 
-            MultiAction actions = actionsByServer.get(address);
+            MultiAction<R> actions = actionsByServer.get(address);
             if (actions == null) {
-              actions = new MultiAction();
+              actions = new MultiAction<R>();
               actionsByServer.put(address, actions);
             }
 
-            Action action = new Action(regionName, row, i);
+            Action<R> action = new Action<R>(regionName, row, i);
             lastServers[i] = address;
             actions.add(regionName, action);
           }
@@ -1128,15 +1209,18 @@ public class HConnectionManager {
         // step 2: make the requests
 
         Map<HServerAddress,Future<MultiResponse>> futures =
-            new HashMap<HServerAddress, Future<MultiResponse>>(actionsByServer.size());
+            new HashMap<HServerAddress, Future<MultiResponse>>(
+                actionsByServer.size());
 
-        for (Entry<HServerAddress, MultiAction> e : actionsByServer.entrySet()) {
+        for (Entry<HServerAddress, MultiAction<R>> e
+             : actionsByServer.entrySet()) {
           futures.put(e.getKey(), pool.submit(createCallable(e.getKey(), e.getValue(), tableName)));
         }
 
         // step 3: collect the failures and successes and prepare for retry
 
-        for (Entry<HServerAddress, Future<MultiResponse>> responsePerServer : futures.entrySet()) {
+        for (Entry<HServerAddress, Future<MultiResponse>> responsePerServer
+             : futures.entrySet()) {
           HServerAddress address = responsePerServer.getKey();
 
           try {
@@ -1161,6 +1245,11 @@ public class HConnectionManager {
                 } else {
                   // Result might be an Exception, including DNRIOE
                   results[regionResult.getFirst()] = regionResult.getSecond();
+                  if (callback != null && !(regionResult.getSecond() instanceof Throwable)) {
+                    callback.update(e.getKey(),
+                        list.get(regionResult.getFirst()).getRow(),
+                        (R)regionResult.getSecond());
+                  }
                 }
               }
             }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1037026&r1=1037025&r2=1037026&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java Fri Nov 19 21:14:05 2010
@@ -19,9 +19,10 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import java.io.IOException;
 import java.io.DataInput;
 import java.io.DataOutput;
-import java.io.IOException;
+import java.lang.reflect.Proxy;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
@@ -51,6 +52,9 @@ import org.apache.hadoop.hbase.NotServin
 import org.apache.hadoop.hbase.UnknownScannerException;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
 import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
+import org.apache.hadoop.hbase.ipc.ExecRPCInvoker;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Writables;
@@ -1325,4 +1329,77 @@ public class HTable implements HTableInt
     return HConnectionManager.getConnection(HBaseConfiguration.create()).
     getRegionCachePrefetch(tableName);
   }
+
+  @Override
+  public <T extends CoprocessorProtocol> T coprocessorProxy(
+      Class<T> protocol, byte[] row) {
+    return (T)Proxy.newProxyInstance(this.getClass().getClassLoader(),
+        new Class[]{protocol},
+        new ExecRPCInvoker(configuration,
+            connection,
+            protocol,
+            tableName,
+            row));
+  }
+
+  @Override
+  public <T extends CoprocessorProtocol, R> Map<byte[],R> coprocessorExec(
+      Class<T> protocol, byte[] startKey, byte[] endKey,
+      Batch.Call<T,R> callable)
+      throws IOException, Throwable {
+
+    final Map<byte[],R> results = new TreeMap<byte[],R>(
+        Bytes.BYTES_COMPARATOR);
+    coprocessorExec(protocol, startKey, endKey, callable,
+        new Batch.Callback<R>(){
+      public void update(byte[] region, byte[] row, R value) {
+        results.put(region, value);
+      }
+    });
+    return results;
+  }
+
+  @Override
+  public <T extends CoprocessorProtocol, R> void coprocessorExec(
+      Class<T> protocol, byte[] startKey, byte[] endKey,
+      Batch.Call<T,R> callable, Batch.Callback<R> callback)
+      throws IOException, Throwable {
+
+    // get regions covered by the row range
+    List<byte[]> keys = getStartKeysInRange(startKey, endKey);
+    connection.processExecs(protocol, keys, tableName, pool, callable,
+        callback);
+  }
+
+  private List<byte[]> getStartKeysInRange(byte[] start, byte[] end)
+  throws IOException {
+    Pair<byte[][],byte[][]> startEndKeys = getStartEndKeys();
+    byte[][] startKeys = startEndKeys.getFirst();
+    byte[][] endKeys = startEndKeys.getSecond();
+
+    if (start == null) {
+      start = HConstants.EMPTY_START_ROW;
+    }
+    if (end == null) {
+      end = HConstants.EMPTY_END_ROW;
+    }
+
+    List<byte[]> rangeKeys = new ArrayList<byte[]>();
+    for (int i=0; i<startKeys.length; i++) {
+      if (Bytes.compareTo(start, startKeys[i]) >= 0 ) {
+        if (Bytes.equals(endKeys[i], HConstants.EMPTY_END_ROW) ||
+            Bytes.compareTo(start, endKeys[i]) < 0) {
+          rangeKeys.add(start);
+        }
+      } else if (Bytes.equals(end, HConstants.EMPTY_END_ROW) ||
+          Bytes.compareTo(startKeys[i], end) <= 0) {
+        rangeKeys.add(startKeys[i]);
+      } else {
+        break; // past stop
+      }
+    }
+
+    return rangeKeys;
+  }
+
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java?rev=1037026&r1=1037025&r2=1037026&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java Fri Nov 19 21:14:05 2010
@@ -25,6 +25,10 @@ import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
+
+import java.util.Map;
 
 /**
  * Used to communicate with a single HBase table.
@@ -350,4 +354,83 @@ public interface HTableInterface {
    * @see #unlockRow
    */
   void unlockRow(RowLock rl) throws IOException;
+
+  /**
+   * Creates and returns a proxy to the CoprocessorProtocol instance running in the
+   * region containing the specified row.  The row given does not actually have
+   * to exist.  Whichever region would contain the row based on start and end keys will
+   * be used.  Note that the {@code row} parameter is also not passed to the
+   * coprocessor handler registered for this protocol, unless the {@code row}
+   * is separately passed as an argument in a proxy method call.  The parameter
+   * here is just used to locate the region used to handle the call.
+   *
+   * @param protocol The class or interface defining the remote protocol
+   * @param row The row key used to identify the remote region location
+   * @return
+   */
+  <T extends CoprocessorProtocol> T coprocessorProxy(Class<T> protocol, byte[] row);
+
+  /**
+   * Invoke the passed
+   * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call} against
+   * the {@link CoprocessorProtocol} instances running in the selected regions.
+   * All regions beginning with the region containing the <code>startKey</code>
+   * row, through to the region containing the <code>endKey</code> row (inclusive)
+   * will be used.  If <code>startKey</code> or <code>endKey</code> is
+   * <code>null</code>, the first and last regions in the table, respectively,
+   * will be used in the range selection.
+   *
+   * @param protocol the CoprocessorProtocol implementation to call
+   * @param startKey start region selection with region containing this row
+   * @param endKey select regions up to and including the region containing
+   * this row
+   * @param callable wraps the CoprocessorProtocol implementation method calls
+   * made per-region
+   * @param <T> CoprocessorProtocol subclass for the remote invocation
+   * @param <R> Return type for the
+   * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call(Object)}
+   * method
+   * @return a <code>Map</code> of region names to
+   * {@link Batch.Call#call(Object)} return values
+   */
+  <T extends CoprocessorProtocol, R> Map<byte[],R> coprocessorExec(
+      Class<T> protocol, byte[] startKey, byte[] endKey, Batch.Call<T,R> callable)
+      throws IOException, Throwable;
+
+  /**
+   * Invoke the passed
+   * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call} against
+   * the {@link CoprocessorProtocol} instances running in the selected regions.
+   * All regions beginning with the region containing the <code>startKey</code>
+   * row, through to the region containing the <code>endKey</code> row
+   * (inclusive)
+   * will be used.  If <code>startKey</code> or <code>endKey</code> is
+   * <code>null</code>, the first and last regions in the table, respectively,
+   * will be used in the range selection.
+   *
+   * <p>
+   * For each result, the given
+   * {@link Batch.Callback#update(byte[], byte[], Object)}
+   * method will be called.
+   *</p>
+   *
+   * @param protocol the CoprocessorProtocol implementation to call
+   * @param startKey start region selection with region containing this row
+   * @param endKey select regions up to and including the region containing
+   * this row
+   * @param callable wraps the CoprocessorProtocol implementation method calls
+   * made per-region
+   * @param callback an instance upon which
+   * {@link Batch.Callback#update(byte[], byte[], Object)} with the
+   * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call(Object)}
+   * return value for each region
+   * @param <T> CoprocessorProtocol subclass for the remote invocation
+   * @param <R> Return type for the
+   * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call(Object)}
+   * method
+   */
+  <T extends CoprocessorProtocol, R> void coprocessorExec(
+      Class<T> protocol, byte[] startKey, byte[] endKey,
+      Batch.Call<T,R> callable, Batch.Callback<R> callback)
+      throws IOException, Throwable;
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java?rev=1037026&r1=1037025&r2=1037026&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java Fri Nov 19 21:14:05 2010
@@ -37,10 +37,11 @@ import java.util.TreeMap;
  * Container for Actions (i.e. Get, Delete, or Put), which are grouped by
  * regionName. Intended to be used with HConnectionManager.processBatch()
  */
-public final class MultiAction implements Writable {
+public final class MultiAction<R> implements Writable {
 
   // map of regions to lists of puts/gets/deletes for that region.
-  public Map<byte[], List<Action>> actions = new TreeMap<byte[], List<Action>>(
+  public Map<byte[], List<Action<R>>> actions =
+    new TreeMap<byte[], List<Action<R>>>(
       Bytes.BYTES_COMPARATOR);
 
   public MultiAction() {
@@ -67,10 +68,10 @@ public final class MultiAction implement
    * @param regionName
    * @param a
    */
-  public void add(byte[] regionName, Action a) {
-    List<Action> rsActions = actions.get(regionName);
+  public void add(byte[] regionName, Action<R> a) {
+    List<Action<R>> rsActions = actions.get(regionName);
     if (rsActions == null) {
-      rsActions = new ArrayList<Action>();
+      rsActions = new ArrayList<Action<R>>();
       actions.put(regionName, rsActions);
     }
     rsActions.add(a);
@@ -83,9 +84,9 @@ public final class MultiAction implement
   /**
    * @return All actions from all regions in this container
    */
-  public List<Action> allActions() {
-    List<Action> res = new ArrayList<Action>();
-    for (List<Action> lst : actions.values()) {
+  public List<Action<R>> allActions() {
+    List<Action<R>> res = new ArrayList<Action<R>>();
+    for (List<Action<R>> lst : actions.values()) {
       res.addAll(lst);
     }
     return res;
@@ -94,12 +95,12 @@ public final class MultiAction implement
   @Override
   public void write(DataOutput out) throws IOException {
     out.writeInt(actions.size());
-    for (Map.Entry<byte[], List<Action>> e : actions.entrySet()) {
+    for (Map.Entry<byte[], List<Action<R>>> e : actions.entrySet()) {
       Bytes.writeByteArray(out, e.getKey());
-      List<Action> lst = e.getValue();
+      List<Action<R>> lst = e.getValue();
       out.writeInt(lst.size());
       for (Action a : lst) {
-        HbaseObjectWritable.writeObject(out, a, Action.class, null);
+        HbaseObjectWritable.writeObject(out, a, a.getClass(), null);
       }
     }
   }
@@ -111,7 +112,7 @@ public final class MultiAction implement
     for (int i = 0; i < mapSize; i++) {
       byte[] key = Bytes.readByteArray(in);
       int listSize = in.readInt();
-      List<Action> lst = new ArrayList<Action>(listSize);
+      List<Action<R>> lst = new ArrayList<Action<R>>(listSize);
       for (int j = 0; j < listSize; j++) {
         lst.add((Action) HbaseObjectWritable.readObject(in, null));
       }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java?rev=1037026&r1=1037025&r2=1037026&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java Fri Nov 19 21:14:05 2010
@@ -116,7 +116,8 @@ public class MultiResponse implements Wr
 
             if (! (obj instanceof Writable))
               obj = null; // squash all non-writables to null.
-            HbaseObjectWritable.writeObject(out, obj, Result.class, null);
+            HbaseObjectWritable.writeObject(out, r.getSecond(),
+                obj != null ? obj.getClass() : Writable.class, null);
           }
         }
       }

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/coprocessor/Batch.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/coprocessor/Batch.java?rev=1037026&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/coprocessor/Batch.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/coprocessor/Batch.java Fri Nov 19 21:14:05 2010
@@ -0,0 +1,166 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client.coprocessor;
+
+import org.apache.commons.lang.reflect.MethodUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+
+
+/**
+ * A collection of interfaces and utilities used for interacting with custom RPC
+ * interfaces exposed by Coprocessors.
+ */
+public abstract class Batch {
+  private static Log LOG = LogFactory.getLog(Batch.class);
+
+  /**
+   * Creates a new {@link Batch.Call} instance that invokes a method
+   * with the given parameters and returns the result.
+   *
+   * <p>
+   * Note that currently the method is naively looked up using the method name
+   * and class types of the passed arguments, which means that
+   * <em>none of the arguments can be <code>null</code></em>.
+   * For more flexibility, see
+   * {@link Batch#forMethod(java.lang.reflect.Method, Object...)}.
+   * </p>
+   *
+   * @param protocol the protocol class being called
+   * @param method the method name
+   * @param args zero or more arguments to be passed to the method
+   * (individual args cannot be <code>null</code>!)
+   * @param <T> the class type of the protocol implementation being invoked
+   * @param <R> the return type for the method call
+   * @return a {@code Callable} instance that will invoke the given method
+   * and return the results
+   * @throws NoSuchMethodException if the method named, with the given argument
+   *     types, cannot be found in the protocol class
+   * @see Batch#forMethod(java.lang.reflect.Method, Object...)
+   * @see org.apache.hadoop.hbase.client.HTable#coprocessorExec(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call, org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)
+   */
+  public static <T extends CoprocessorProtocol,R> Call<T,R> forMethod(
+      final Class<T> protocol, final String method, final Object... args)
+  throws NoSuchMethodException {
+    Class[] types = new Class[args.length];
+    for (int i=0; i<args.length; i++) {
+      if (args[i] == null) {
+        throw new NullPointerException("Method argument cannot be null");
+      }
+      types[i] = args[i].getClass();
+    }
+
+    Method m = MethodUtils.getMatchingAccessibleMethod(protocol, method, types);
+    if (m == null) {
+      throw new NoSuchMethodException("No matching method found for '" +
+          method + "'");
+    }
+
+    m.setAccessible(true);
+    return forMethod(m, args);
+  }
+
+  /**
+   * Creates a new {@link Batch.Call} instance that invokes a method
+   * with the given parameters and returns the result.
+   *
+   * @param method the method reference to invoke
+   * @param args zero or more arguments to be passed to the method
+   * @param <T> the class type of the protocol implementation being invoked
+   * @param <R> the return type for the method call
+   * @return a {@code Callable} instance that will invoke the given method and
+   * return the results
+   * @see org.apache.hadoop.hbase.client.HTable#coprocessorExec(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call, org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)
+   */
+  public static <T extends CoprocessorProtocol,R> Call<T,R> forMethod(
+      final Method method, final Object... args) {
+    return new Call<T,R>() {
+        public R call(T instance) throws IOException {
+          try {
+            if (Proxy.isProxyClass(instance.getClass())) {
+              InvocationHandler invoker = Proxy.getInvocationHandler(instance);
+              return (R)invoker.invoke(instance, method, args);
+            } else {
+              LOG.warn("Non proxied invocation of method '"+method.getName()+"'!");
+              return (R)method.invoke(instance, args);
+            }
+          }
+          catch (IllegalAccessException iae) {
+            throw new IOException("Unable to invoke method '"+
+                method.getName()+"'", iae);
+          }
+          catch (InvocationTargetException ite) {
+            throw new IOException(ite.toString(), ite);
+          }
+          catch (Throwable t) {
+            throw new IOException(t.toString(), t);
+          }
+        }
+    };
+  }
+
+  /**
+   * Defines a unit of work to be executed.
+   *
+   * <p>
+   * When used with
+   * {@link org.apache.hadoop.hbase.client.HTable#coprocessorExec(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call, org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)}
+   * the implementations {@link Batch.Call#call(Object)} method will be invoked
+   * with a proxy to the
+   * {@link org.apache.hadoop.hbase.ipc.CoprocessorProtocol}
+   * sub-type instance.
+   * </p>
+   * @see org.apache.hadoop.hbase.client.coprocessor
+   * @see org.apache.hadoop.hbase.client.HTable#coprocessorExec(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call)
+   * @see org.apache.hadoop.hbase.client.HTable#coprocessorExec(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call, org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)
+   * @param <T> the instance type to be passed to
+   * {@link Batch.Call#call(Object)}
+   * @param <R> the return type from {@link Batch.Call#call(Object)}
+   */
+  public static interface Call<T,R> {
+    public R call(T instance) throws IOException;
+  }
+
+  /**
+   * Defines a generic callback to be triggered for each {@link Batch.Call#call(Object)}
+   * result.
+   *
+   * <p>
+   * When used with
+   * {@link org.apache.hadoop.hbase.client.HTable#coprocessorExec(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call, org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)},
+   * the implementation's {@link Batch.Callback#update(byte[], byte[], Object)}
+   * method will be called with the {@link Batch.Call#call(Object)} return value
+   * from each region in the selected range.
+   * </p>
+   * @param <R> the return type from the associated {@link Batch.Call#call(Object)}
+   * @see org.apache.hadoop.hbase.client.HTable#coprocessorExec(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call, org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)
+   */
+  public static interface Callback<R> {
+    public void update(byte[] region, byte[] row, R result);
+  }
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/coprocessor/Exec.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/coprocessor/Exec.java?rev=1037026&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/coprocessor/Exec.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/coprocessor/Exec.java Fri Nov 19 21:14:05 2010
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.coprocessor;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Row;
+import org.apache.hadoop.hbase.io.HbaseObjectWritable;
+import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
+import org.apache.hadoop.hbase.ipc.Invocation;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.reflect.Method;
+
+/**
+ * Represents an arbitrary method invocation against a Coprocessor
+ * instance.  In order for a coprocessor implementation to be remotely callable
+ * by clients, it must define and implement a {@link CoprocessorProtocol}
+ * subclass.  Only methods defined in the {@code CoprocessorProtocol} interface
+ * will be callable by clients.
+ *
+ * <p>
+ * This class is used internally by
+ * {@link org.apache.hadoop.hbase.client.HTable#coprocessorExec(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call, org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)}
+ * to wrap the {@code CoprocessorProtocol} method invocations requested in
+ * RPC calls.  It should not be used directly by HBase clients.
+ * </p>
+ *
+ * @see ExecResult
+ * @see org.apache.hadoop.hbase.client.HTable#coprocessorExec(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call)
+ * @see org.apache.hadoop.hbase.client.HTable#coprocessorExec(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call, org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)
+ */
+public class Exec extends Invocation implements Row {
+  private Configuration conf = HBaseConfiguration.create();
+  /** Row key used as a reference for any region lookups */
+  private byte[] referenceRow;
+  private Class<? extends CoprocessorProtocol> protocol;
+
+  public Exec() {
+  }
+
+  public Exec(Configuration configuration,
+      byte[] row,
+      Class<? extends CoprocessorProtocol> protocol,
+      Method method, Object[] parameters) {
+    super(method, parameters);
+    this.conf = configuration;
+    this.referenceRow = row;
+    this.protocol = protocol;
+  }
+
+  public Class<? extends CoprocessorProtocol> getProtocol() {
+    return protocol;
+  }
+
+  public byte[] getRow() {
+    return referenceRow;
+  }
+
+  public int compareTo(Row row) {
+    return Bytes.compareTo(referenceRow, row.getRow());
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    Bytes.writeByteArray(out, referenceRow);
+    out.writeUTF(protocol.getName());
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    referenceRow = Bytes.readByteArray(in);
+    String protocolName = in.readUTF();
+    try {
+      protocol = (Class<CoprocessorProtocol>)conf.getClassByName(protocolName);
+    }
+    catch (ClassNotFoundException cnfe) {
+      throw new IOException("Protocol class "+protocolName+" not found", cnfe);
+    }
+  }
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/coprocessor/ExecResult.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/coprocessor/ExecResult.java?rev=1037026&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/coprocessor/ExecResult.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/coprocessor/ExecResult.java Fri Nov 19 21:14:05 2010
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.coprocessor;
+
+import org.apache.hadoop.hbase.io.HbaseObjectWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Represents the return value from a
+ * {@link org.apache.hadoop.hbase.client.coprocessor.Exec} invocation.
+ * This simply wraps the value for easier
+ * {@link org.apache.hadoop.hbase.io.HbaseObjectWritable}
+ * serialization.
+ *
+ * <p>
+ * This class is used internally by the HBase client code to properly serialize
+ * responses from {@link org.apache.hadoop.hbase.ipc.CoprocessorProtocol}
+ * method invocations.  It should not be used directly by clients.
+ * </p>
+ *
+ * @see Exec
+ * @see org.apache.hadoop.hbase.client.HTable#coprocessorExec(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call)
+ * @see org.apache.hadoop.hbase.client.HTable#coprocessorExec(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call, org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)
+ */
+public class ExecResult implements Writable {
+  private byte[] regionName;
+  private Class<?> valueType;
+  private Object value;
+
+  public ExecResult() {
+  }
+
+  public ExecResult(byte[] region, Class<?> valueType, Object value) {
+    this.regionName = region;
+    this.valueType = valueType;
+    this.value = value;
+  }
+
+  public byte[] getRegionName() {
+    return regionName;
+  }
+
+  public Object getValue() {
+    return value;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    Bytes.writeByteArray(out, regionName);
+    HbaseObjectWritable.writeObject(out, value,
+        (valueType != null ? valueType : Writable.class), null);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    regionName = Bytes.readByteArray(in);
+    value = HbaseObjectWritable.readObject(in, null);
+  }
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/coprocessor/package-info.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/coprocessor/package-info.java?rev=1037026&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/coprocessor/package-info.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/coprocessor/package-info.java Fri Nov 19 21:14:05 2010
@@ -0,0 +1,200 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+Provides client classes for invoking Coprocessor RPC protocols
+
+<p>
+<ul>
+ <li><a href="#overview">Overview</a></li>
+ <li><a href="#usage">Example Usage</a></li>
+</ul>
+</p>
+
+<h2><a name="overview">Overview</a></h2>
+<p>
+The coprocessor framework provides a way for custom code to run in place on the
+HBase region servers with each of a table's regions.  These client classes
+enable applications to communicate with coprocessor instances via custom RPC
+protocols.
+</p>
+
+<p>
+In order to provide a custom RPC protocol to clients, a coprocessor implementation
+defines an interface that extends {@link org.apache.hadoop.hbase.ipc.CoprocessorProtocol}.
+The interface can define any methods that the coprocessor wishes to expose.
+Using this protocol, you can communicate with the coprocessor instances via
+the {@link org.apache.hadoop.hbase.client.HTable#coprocessorProxy(Class, byte[])} and
+{@link org.apache.hadoop.hbase.client.HTable#coprocessorExec(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call, org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)}
+methods.
+</p>
+
+<p>
+Since {@link org.apache.hadoop.hbase.ipc.CoprocessorProtocol} instances are
+associated with individual regions within the table, the client RPC calls
+must ultimately identify which regions should be used in the <code>CoprocessorProtocol</code>
+method invocations.  Since regions are seldom handled directly in client code
+and the region names may change over time, the coprocessor RPC calls use row keys
+to identify which regions should be used for the method invocations.  Clients
+can call <code>CoprocessorProtocol</code> methods against either:
+<ul>
+ <li><strong>a single region</strong> - calling
+   {@link org.apache.hadoop.hbase.client.HTable#coprocessorProxy(Class, byte[])}
+   with a single row key.  This returns a dynamic proxy of the <code>CoprocessorProtocol</code>
+   interface which uses the region containing the given row key (even if the
+   row does not exist) as the RPC endpoint.</li>
+ <li><strong>a range of regions</strong> - calling
+   {@link org.apache.hadoop.hbase.client.HTable#coprocessorExec(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call, org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)}
+   with a starting row key and an ending row key.  All regions in the table
+   from the region containing the start row key to the region containing the end
+   row key (inclusive), will we used as the RPC endpoints.</li>
+</ul>
+</p>
+
+<p><em>Note that the row keys passed as parameters to the <code>HTable</code>
+methods are not passed to the <code>CoprocessorProtocol</code> implementations.
+They are only used to identify the regions for endpoints of the remote calls.
+</em></p>
+
+<p>
+The {@link org.apache.hadoop.hbase.client.coprocessor.Batch} class defines two
+interfaces used for <code>CoprocessorProtocol</code> invocations against
+multiple regions.  Clients implement {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call} to
+call methods of the actual <code>CoprocessorProtocol</code> instance.  The interface's
+<code>call()</code> method will be called once per selected region, passing the
+<code>CoprocessorProtocol</code> instance for the region as a parameter.  Clients
+can optionally implement {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Callback}
+to be notified of the results from each region invocation as they complete.
+The instance's {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Callback#update(byte[], byte[], Object)}
+method will be called with the {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call(Object)}
+return value from each region.
+</p>
+
+<h2><a name="usage">Example usage</a></h2>
+<p>
+To start with, let's use a fictitious coprocessor, <code>RowCountCoprocessor</code>
+that counts the number of rows and key-values in each region where it is running.
+For clients to query this information, the coprocessor defines and implements
+the following {@link org.apache.hadoop.hbase.ipc.CoprocessorProtocol} extension
+interface:
+</p>
+
+<div style="background-color: #cccccc; padding: 2px">
+<blockquote><pre>
+public interface RowCountProtocol extends CoprocessorProtocol {
+  long getRowCount();
+  long getRowCount(Filter filt);
+  long getKeyValueCount();
+}
+</pre></blockquote></div>
+
+<p>
+Now we need a way to access the results that <code>RowCountCoprocessor</code>
+is making available.  If we want to find the row count for all regions, we could
+use:
+</p>
+
+<div style="background-color: #cccccc; padding: 2px">
+<blockquote><pre>
+HTable table = new HTable("mytable");
+// find row count keyed by region name
+Map<byte[],Long> results = table.coprocessorExec(
+    RowCountProtocol.class, // the protocol interface we're invoking
+    null, null,             // start and end row keys
+    new Batch.Call<RowCountProtocol,Long>() {
+       public Long call(RowCountProtocol counter) {
+         return counter.getRowCount();
+       }
+     });
+</pre></blockquote></div>
+
+<p>
+This will return a <code>java.util.Map</code> of the <code>counter.getRowCount()</code>
+result for the <code>RowCountCoprocessor</code> instance running in each region
+of <code>mytable</code>, keyed by the region name.
+</p>
+
+<p>
+By implementing {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call}
+as an anonymous class, we can invoke <code>RowCountProtocol</code> methods
+directly against the {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call(Object)}
+method's argument.  Calling {@link org.apache.hadoop.hbase.client.HTable#coprocessorExec(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call)}
+will take care of invoking <code>Batch.Call.call()</code> against our anonymous class
+with the <code>RowCountCoprocessor</code> instance for each table region.
+</p>
+
+<p>
+For this simple case, where we only want to obtain the result from a single
+<code>CoprocessorProtocol</code> method, there's also a bit of syntactic sugar
+we can use to cut down on the amount of code required:
+</p>
+
+<div style="background-color: #cccccc; padding: 2px">
+<blockquote><pre>
+HTable table = new HTable("mytable");
+Batch.Call<RowCountProtocol,Long> call = Batch.forMethod(RowCountProtocol.class, "getRowCount");
+Map<byte[],Long> results = table.coprocessorExec(RowCountProtocol.class, null, null, call);
+</pre></blockquote></div>
+
+<p>
+{@link org.apache.hadoop.hbase.client.coprocessor.Batch#forMethod(Class, String, Object...)}
+is a simple factory method that will return a {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call}
+instance that will call <code>RowCountProtocol.getRowCount()</code> for us
+using reflection.
+</p>
+
+<p>
+However, if you want to perform additional processing on the results,
+implementing {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call}
+directly will provide more power and flexibility.  For example, if you would
+like to combine row count and key-value count for each region:
+</p>
+
+<div style="background-color: #cccccc; padding: 2px">
+<blockquote><pre>
+HTable table = new HTable("mytable");
+// combine row count and kv count for region
+Map<byte[],Pair<Long,Long>> results = table.coprocessorExec(
+    RowCountProtocol.class,
+    null, null,
+    new Batch.Call<RowCountProtocol,Pair<Long,Long>>() {
+        public Pair<Long,Long> call(RowCountProtocol counter) {
+          return new Pair(counter.getRowCount(), counter.getKeyValueCount());
+        }
+    });
+</pre></blockquote></div>
+
+<p>
+Similarly, you could average the number of key-values per row for each region:
+</p>
+
+<div style="background-color: #cccccc; padding: 2px">
+<blockquote><pre>
+Map<byte[],Double> results = table.coprocessorExec(
+    RowCountProtocol.class,
+    null, null,
+    new Batch.Call<RowCountProtocol,Double>() {
+        public Double call(RowCountProtocol counter) {
+          return ((double)counter.getKeyValueCount()) / ((double)counter.getRowCount());
+        }
+    });
+</pre></blockquote></div>
+*/
+package org.apache.hadoop.hbase.client.coprocessor;
\ No newline at end of file

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java?rev=1037026&r1=1037025&r2=1037026&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java Fri Nov 19 21:14:05 2010
@@ -54,6 +54,7 @@ import org.apache.hadoop.hbase.client.Pu
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Row;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.coprocessor.Exec;
 import org.apache.hadoop.hbase.filter.BinaryComparator;
 import org.apache.hadoop.hbase.filter.ColumnCountGetFilter;
 import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
@@ -200,6 +201,8 @@ public class HbaseObjectWritable impleme
     addToMap(MultiAction.class, code++);
     addToMap(MultiResponse.class, code++);
 
+    // coprocessor execution
+    addToMap(Exec.class, code++);
     addToMap(Increment.class, code++);
 
     addToMap(KeyOnlyFilter.class, code++);

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionHeader.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionHeader.java?rev=1037026&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionHeader.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionHeader.java Fri Nov 19 21:14:05 2010
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * The IPC connection header sent by the client to the server
+ * on connection establishment.
+ */
+class ConnectionHeader implements Writable {
+  private String protocol;
+  private UserGroupInformation ugi = null;
+
+  public ConnectionHeader() {}
+
+  /**
+   * Create a new {@link ConnectionHeader} with the given <code>protocol</code>
+   * and {@link UserGroupInformation}.
+   * @param protocol protocol used for communication between the IPC client
+   *                 and the server
+   * @param ugi {@link UserGroupInformation} of the client communicating with
+   *            the server
+   */
+  public ConnectionHeader(String protocol, UserGroupInformation ugi) {
+    this.protocol = protocol;
+    this.ugi = ugi;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    protocol = Text.readString(in);
+    if (protocol.isEmpty()) {
+      protocol = null;
+    }
+
+    boolean ugiUsernamePresent = in.readBoolean();
+    if (ugiUsernamePresent) {
+      String username = in.readUTF();
+      ugi.readFields(in);
+    } else {
+      ugi = null;
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    Text.writeString(out, (protocol == null) ? "" : protocol);
+    if (ugi != null) {
+      //Send both effective user and real user for simple auth
+      out.writeBoolean(true);
+      out.writeUTF(ugi.getUserName());
+    } else {
+      out.writeBoolean(false);
+    }
+  }
+
+  public String getProtocol() {
+    return protocol;
+  }
+
+  public UserGroupInformation getUgi() {
+    return ugi;
+  }
+
+  public String toString() {
+    return protocol + "-" + ugi;
+  }
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/CoprocessorProtocol.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/CoprocessorProtocol.java?rev=1037026&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/CoprocessorProtocol.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/CoprocessorProtocol.java Fri Nov 19 21:14:05 2010
@@ -0,0 +1,40 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.hadoop.ipc.VersionedProtocol;
+
+/**
+ * All custom RPC protocols to be exported by Coprocessors must extend this interface.
+ *
+ * <p>
+ * <strong>Note that all callable methods must have a return type handled by
+ * {@link org.apache.hadoop.hbase.io.HbaseObjectWritable#writeObject(java.io.DataOutput, Object, Class, org.apache.hadoop.conf.Configuration)}.</strong>
+ * That is:
+ * <ul>
+ *   <li>a Java primitive type ({@code int}, {@code float}, etc)</li>
+ *   <li>a Java {@code String}</li>
+ *   <li>a {@link org.apache.hadoop.io.Writable}</li>
+ *   <li>an array or {@code java.util.List} of one of the above</li>
+ * </ul>
+ * </p>
+ */
+public interface CoprocessorProtocol extends VersionedProtocol {
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ExecRPCInvoker.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ExecRPCInvoker.java?rev=1037026&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ExecRPCInvoker.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ExecRPCInvoker.java Fri Nov 19 21:14:05 2010
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.client.coprocessor.Exec;
+import org.apache.hadoop.hbase.client.coprocessor.ExecResult;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+
+/**
+ * Backs a {@link CoprocessorProtocol} subclass proxy and forwards method
+ * invocations for server execution.  Note that internally this will issue a
+ * separate RPC call for each method invocation (using a
+ * {@link org.apache.hadoop.hbase.client.ServerCallable} instance).
+ */
+public class ExecRPCInvoker implements InvocationHandler {
+  private static final Log LOG = LogFactory.getLog(ExecRPCInvoker.class);
+
+  private Configuration conf;
+  private final HConnection connection;
+  private Class<? extends CoprocessorProtocol> protocol;
+  private final byte[] table;
+  private final byte[] row;
+  private byte[] regionName;
+
+  public ExecRPCInvoker(Configuration conf,
+      HConnection connection,
+      Class<? extends CoprocessorProtocol> protocol,
+      byte[] table,
+      byte[] row) {
+    this.conf = conf;
+    this.connection = connection;
+    this.protocol = protocol;
+    this.table = table;
+    this.row = row;
+  }
+
+  @Override
+  public Object invoke(Object instance, final Method method, final Object[] args)
+      throws Throwable {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Call: "+method.getName()+", "+(args != null ? args.length : 0));
+    }
+
+    if (row != null) {
+      final Exec exec = new Exec(conf, row, protocol, method, args);
+      ServerCallable<ExecResult> callable =
+          new ServerCallable<ExecResult>(connection, table, row) {
+            public ExecResult call() throws Exception {
+              return server.execCoprocessor(location.getRegionInfo().getRegionName(),
+                  exec);
+            }
+          };
+      ExecResult result = connection.getRegionServerWithRetries(callable);
+      this.regionName = result.getRegionName();
+      LOG.debug("Result is region="+ Bytes.toStringBinary(regionName) +
+          ", value="+result.getValue());
+      return result.getValue();
+    }
+
+    return null;
+  }
+
+  public byte[] getRegionName() {
+    return regionName;
+  }
+}

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java?rev=1037026&r1=1037025&r2=1037026&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java Fri Nov 19 21:14:05 2010
@@ -26,10 +26,10 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.ObjectWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.ipc.VersionedProtocol;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -182,6 +182,7 @@ public class HBaseClient {
    * socket connected to a remote address.  Calls are multiplexed through this
    * socket: responses may be delivered out of order. */
   private class Connection extends Thread {
+    private ConnectionHeader header;              // connection header
     private ConnectionId remoteId;
     private Socket socket = null;                 // connected socket
     private DataInputStream in;
@@ -193,10 +194,6 @@ public class HBaseClient {
     protected final AtomicBoolean shouldCloseConnection = new AtomicBoolean();  // indicate if the connection is closed
     private IOException closeException; // close reason
 
-    public Connection(InetSocketAddress address) throws IOException {
-      this(new ConnectionId(address, null, 0));
-    }
-
     public Connection(ConnectionId remoteId) throws IOException {
       if (remoteId.getAddress().isUnresolved()) {
         throw new UnknownHostException("unknown host: " +
@@ -204,6 +201,11 @@ public class HBaseClient {
       }
       this.remoteId = remoteId;
       UserGroupInformation ticket = remoteId.getTicket();
+      Class<? extends VersionedProtocol> protocol = remoteId.getProtocol();
+
+      header = new ConnectionHeader(
+          protocol == null ? null : protocol.getName(), ticket);
+
       this.setName("IPC Client (" + socketFactory.hashCode() +") connection to " +
         remoteId.getAddress().toString() +
         ((ticket==null)?" from an unknown user": (" from " + ticket.getUserName())));
@@ -390,8 +392,8 @@ public class HBaseClient {
       out.write(HBaseServer.CURRENT_VERSION);
       //When there are more fields we can have ConnectionHeader Writable.
       DataOutputBuffer buf = new DataOutputBuffer();
-      ObjectWritable.writeObject(buf, remoteId.getTicket(),
-                                 UserGroupInformation.class, conf);
+      header.write(buf);
+
       int bufLen = buf.getLength();
       out.writeInt(bufLen);
       out.write(buf.getData(), 0, bufLen);
@@ -721,15 +723,27 @@ public class HBaseClient {
    * @throws IOException e
    */
   public Writable call(Writable param, InetSocketAddress address)
-  throws IOException {
+  throws IOException, InterruptedException {
       return call(param, address, null, 0);
   }
 
   public Writable call(Writable param, InetSocketAddress addr,
                        UserGroupInformation ticket, int rpcTimeout)
-                       throws IOException {
+                       throws IOException, InterruptedException {
+    return call(param, addr, null, ticket, rpcTimeout);
+  }
+
+  /** Make a call, passing <code>param</code>, to the IPC server running at
+   * <code>address</code> which is servicing the <code>protocol</code> protocol,
+   * with the <code>ticket</code> credentials, returning the value.
+   * Throws exceptions if there are network problems or if the remote code
+   * threw an exception. */
+  public Writable call(Writable param, InetSocketAddress addr,
+                       Class<? extends VersionedProtocol> protocol,
+                       UserGroupInformation ticket, int rpcTimeout)
+      throws InterruptedException, IOException {
     Call call = new Call(param);
-    Connection connection = getConnection(addr, ticket, rpcTimeout, call);
+    Connection connection = getConnection(addr, protocol, ticket, rpcTimeout, call);
     connection.sendParam(call);                 // send the parameter
     boolean interrupted = false;
     //noinspection SynchronizationOnLocalVariableOrMethodParameter
@@ -800,9 +814,22 @@ public class HBaseClient {
    * @param addresses socket addresses
    * @return  Writable[]
    * @throws IOException e
+   * @deprecated Use {@link #call(Writable[], InetSocketAddress[], Class, UserGroupInformation)} instead
    */
+  @Deprecated
   public Writable[] call(Writable[] params, InetSocketAddress[] addresses)
-    throws IOException {
+    throws IOException, InterruptedException {
+    return call(params, addresses, null, null);
+  }
+
+  /** Makes a set of calls in parallel.  Each parameter is sent to the
+   * corresponding address.  When all values are available, or have timed out
+   * or errored, the collected results are returned in an array.  The array
+   * contains nulls for calls that timed out or errored.  */
+  public Writable[] call(Writable[] params, InetSocketAddress[] addresses,
+                         Class<? extends VersionedProtocol> protocol,
+                         UserGroupInformation ticket)
+      throws IOException, InterruptedException {
     if (addresses.length == 0) return new Writable[0];
 
     ParallelResults results = new ParallelResults(params.length);
@@ -812,7 +839,8 @@ public class HBaseClient {
       for (int i = 0; i < params.length; i++) {
         ParallelCall call = new ParallelCall(params[i], results, i);
         try {
-          Connection connection = getConnection(addresses[i], null, 0, call);
+          Connection connection =
+              getConnection(addresses[i], protocol, ticket, 0, call);
           connection.sendParam(call);             // send each parameter
         } catch (IOException e) {
           // log errors
@@ -834,6 +862,7 @@ public class HBaseClient {
   /* Get a connection from the pool, or create a new one and add it to the
    * pool.  Connections to a given host/port are reused. */
   private Connection getConnection(InetSocketAddress addr,
+                                   Class<? extends VersionedProtocol> protocol,
                                    UserGroupInformation ticket,
                                    int rpcTimeout,
                                    Call call)
@@ -847,7 +876,7 @@ public class HBaseClient {
      * connectionsId object and with set() method. We need to manage the
      * refs for keys in HashMap properly. For now its ok.
      */
-    ConnectionId remoteId = new ConnectionId(addr, ticket, rpcTimeout);
+    ConnectionId remoteId = new ConnectionId(addr, protocol, ticket, rpcTimeout);
     do {
       synchronized (connections) {
         connection = connections.get(remoteId);
@@ -874,9 +903,14 @@ public class HBaseClient {
     final InetSocketAddress address;
     final UserGroupInformation ticket;
     final private int rpcTimeout;
+    Class<? extends VersionedProtocol> protocol;
+    private static final int PRIME = 16777619;
 
-    ConnectionId(InetSocketAddress address, UserGroupInformation ticket,
+    ConnectionId(InetSocketAddress address,
+        Class<? extends VersionedProtocol> protocol,
+        UserGroupInformation ticket,
         int rpcTimeout) {
+      this.protocol = protocol;
       this.address = address;
       this.ticket = ticket;
       this.rpcTimeout = rpcTimeout;
@@ -885,6 +919,11 @@ public class HBaseClient {
     InetSocketAddress getAddress() {
       return address;
     }
+
+    Class<? extends VersionedProtocol> getProtocol() {
+      return protocol;
+    }
+
     UserGroupInformation getTicket() {
       return ticket;
     }
@@ -893,16 +932,19 @@ public class HBaseClient {
     public boolean equals(Object obj) {
      if (obj instanceof ConnectionId) {
        ConnectionId id = (ConnectionId) obj;
-       return address.equals(id.address) && ticket == id.ticket && 
-       rpcTimeout == id.rpcTimeout;
+       return address.equals(id.address) && protocol == id.protocol &&
+           ticket == id.ticket && rpcTimeout == id.rpcTimeout;
        //Note : ticket is a ref comparision.
      }
      return false;
     }
 
-    @Override
+    @Override  // simply use the default Object#hashcode() ?
     public int hashCode() {
-      return address.hashCode() ^ System.identityHashCode(ticket) ^ rpcTimeout;
+      return (address.hashCode() + PRIME * (
+                  PRIME * System.identityHashCode(protocol) ^
+                  System.identityHashCode(ticket)
+                )) ^ rpcTimeout;
     }
   }
 }



Mime
View raw message