hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbau...@apache.org
Subject svn commit: r1356924 [1/2] - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/client/ main/java/org/apache/hadoop/hbase/io/ main/java/org/apache/hadoop/hbase/ipc/ main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/had...
Date Tue, 03 Jul 2012 20:41:07 GMT
Author: mbautin
Date: Tue Jul  3 20:41:02 2012
New Revision: 1356924

URL: http://svn.apache.org/viewvc?rev=1356924&view=rev
Log:
[HBASE-6215] Per-request profiling

Author: aurickq

Summary:
1. Ability to switch on profiling using HTable interface, which will ask the server to send back profiling data with each RPC call.

2. Ability to set request tag at HTable level, so server may aggregate stats based on these tags.

3. Bumped RPC version to support profiling and new HBaseRPCOptions object. RPC compression and profiling now use this object.

4. HBase shell commands to get profiling data, load tester argument to switch on profiling for X% of reads/writes.

Test Plan: New unit test for per-request profiling. Ran unit tests on MR servers, interop testing between old server/new client, new server/old client.

Reviewers: kannan, kranganathan

Reviewed By: kranganathan

CC: hbase-eng@, tao-diffs@lists

Differential Revision: https://phabricator.fb.com/D495908

Added:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPCOptions.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/ProfilingData.java
    hbase/branches/0.89-fb/src/main/ruby/shell/commands/get_profiling.rb
    hbase/branches/0.89-fb/src/main/ruby/shell/commands/set_profiling.rb
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/ipc/TestPerRequestProfiling.java
Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
    hbase/branches/0.89-fb/src/main/ruby/hbase/table.rb
    hbase/branches/0.89-fb/src/main/ruby/shell.rb
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexer.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java?rev=1356924&r1=1356923&r2=1356924&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java Tue Jul  3 20:41:02 2012
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.HRegionLo
 import org.apache.hadoop.hbase.HServerAddress;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.ipc.HBaseRPCOptions;
 import org.apache.hadoop.hbase.ipc.HMasterInterface;
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
@@ -142,6 +143,28 @@ public interface HConnection extends Clo
   /**
    * Establishes a connection to the region server at the specified address.
    * @param regionServer - the server to connect to
+   * @param options - ipc options to use
+   * @return proxy for HRegionServer
+   * @throws IOException if a remote or network exception occurs
+   */
+  public HRegionInterface getHRegionConnection(HServerAddress regionServer, HBaseRPCOptions options)
+  throws IOException;
+
+  /**
+   * Establishes a connection to the region server at the specified address.
+   * @param regionServer - the server to connect to
+   * @param getMaster - do we check if master is alive
+   * @param options - ipc options to use
+   * @return proxy for HRegionServer
+   * @throws IOException if a remote or network exception occurs
+   */
+  public HRegionInterface getHRegionConnection(
+      HServerAddress regionServer, boolean getMaster, HBaseRPCOptions options)
+  throws IOException;
+
+  /**
+   * Establishes a connection to the region server at the specified address.
+   * @param regionServer - the server to connect to
    * @return proxy for HRegionServer
    * @throws IOException if a remote or network exception occurs
    */
@@ -204,39 +227,44 @@ public interface HConnection extends Clo
    *          A batch of Gets to process.
    * @param tableName
    *          The name of the table
+   * @param options
+   *          RPC options object
    * @return Count of committed Puts. On fault, < list.size().
    * @throws IOException
    *           if a remote or network exception occurs
    */
   public Result[] processBatchOfGets(List<Get> actions,
-      final byte[] tableName)
+      final byte[] tableName, final HBaseRPCOptions options)
  throws IOException;
 
   /**
    * Process a batch of Puts. Does the retries.
    * @param list A batch of Puts to process.
    * @param tableName The name of the table
+   * @param options ipc options
    * @return Count of committed Puts.  On fault, < list.size().
    * @throws IOException if a remote or network exception occurs
    */
-  public int processBatchOfRows(ArrayList<Put> list, byte[] tableName)
+  public int processBatchOfRows(ArrayList<Put> list, byte[] tableName, HBaseRPCOptions options)
   throws IOException;
 
   /**
    * Process a batch of Deletes. Does the retries.
    * @param list A batch of Deletes to process.
-   * @return Count of committed Deletes. On fault, < list.size().
    * @param tableName The name of the table
+   * @param options ipc options
+   * @return Count of committed Deletes. On fault, < list.size().
    * @throws IOException if a remote or network exception occurs
    */
-  public int processBatchOfDeletes(List<Delete> list, byte[] tableName)
+  public int processBatchOfDeletes(List<Delete> list, byte[] tableName, 
+      final HBaseRPCOptions options)
   throws IOException;
 
-  public void processBatchOfPuts(List<Put> list, final byte[] tableName)
+  public void processBatchOfPuts(List<Put> list, final byte[] tableName, HBaseRPCOptions options)
   throws IOException;
 
     public int processBatchOfRowMutations(final List<RowMutations> list,
-      final byte[] tableName)
+      final byte[] tableName, HBaseRPCOptions options)
     throws IOException;
 
   /**
@@ -250,7 +278,7 @@ public interface HConnection extends Clo
    * @return the list of failed put among the MultiPut request, otherwise return null 
    *         if all puts are sent to the HRegionServer successfully.
    */
-  public List<Put> processSingleMultiPut(MultiPut mput);
+  public List<Put> processSingleMultiPut(MultiPut mput, HBaseRPCOptions options);
   
   /**
    * Delete the cached location

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1356924&r1=1356923&r2=1356924&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Tue Jul  3 20:41:02 2012
@@ -62,6 +62,7 @@ import org.apache.hadoop.hbase.RemoteExc
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
 import org.apache.hadoop.hbase.ipc.HBaseRPC;
+import org.apache.hadoop.hbase.ipc.HBaseRPCOptions;
 import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;
 import org.apache.hadoop.hbase.ipc.HMasterInterface;
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
@@ -90,7 +91,7 @@ public class HConnectionManager {
     Runtime.getRuntime().addShutdownHook(new Thread("HCM.shutdownHook") {
       @Override
       public void run() {
-        HConnectionManager.deleteAllConnections(true);
+        HConnectionManager.deleteAllConnections();
       }
     });
   }
@@ -153,7 +154,7 @@ public class HConnectionManager {
       Integer key = HBaseConfiguration.hashCode(conf);
       TableServers t = HBASE_INSTANCES.remove(key);
       if (t != null) {
-        t.close(stopProxy);
+        t.close();
       }
     }
   }
@@ -162,13 +163,14 @@ public class HConnectionManager {
    * Delete information for all connections.
    * @param stopProxy stop the proxy as well
    */
-  public static void deleteAllConnections(boolean stopProxy) {
+  public static void deleteAllConnections() {
     synchronized (HBASE_INSTANCES) {
       for (TableServers t : HBASE_INSTANCES.values()) {
         if (t != null) {
-          t.close(stopProxy);
+          t.close();
         }
       }
+      HBaseRPC.stopClients ();
     }
     synchronized (ZK_WRAPPERS) {
       for (ClientZKConnection connection : ZK_WRAPPERS.values()) {
@@ -363,11 +365,6 @@ public class HConnectionManager {
 
     private volatile Configuration conf;
 
-    // Known region HServerAddress.toString() -> HRegionInterface
-    private final Map<String, HRegionInterface> servers =
-      new ConcurrentHashMap<String, HRegionInterface>();
-    private final ConcurrentHashMap<String, String> connectionLock = new ConcurrentHashMap<String, String>();
-
     // Used by master and region servers during safe mode only
     private volatile HRegionLocation rootRegionLocation;
 
@@ -470,7 +467,7 @@ public class HConnectionManager {
               HMasterInterface tryMaster = (HMasterInterface)HBaseRPC.getProxy(
                   HMasterInterface.class, HBaseRPCProtocolVersion.versionID,
                   masterLocation.getInetSocketAddress(), this.conf,
-                  (int)this.rpcTimeout);
+                  (int)this.rpcTimeout, HBaseRPCOptions.DEFAULT);
 
               if (tryMaster.isMasterRunning()) {
                 this.master = tryMaster;
@@ -651,7 +648,8 @@ public class HConnectionManager {
       scan.setCaching(rows);
       ScannerCallable s = new ScannerCallable(this,
           (Bytes.equals(tableName, HConstants.META_TABLE_NAME) ?
-              HConstants.ROOT_TABLE_NAME : HConstants.META_TABLE_NAME), scan);
+              HConstants.ROOT_TABLE_NAME : HConstants.META_TABLE_NAME), 
+              scan, HBaseRPCOptions.DEFAULT);
       try {
         // Open scanner
         getRegionServerWithRetries(s);
@@ -1139,42 +1137,41 @@ public class HConnectionManager {
     }
 
     public HRegionInterface getHRegionConnection(
-        HServerAddress regionServer, boolean getMaster)
+        HServerAddress regionServer, boolean getMaster, HBaseRPCOptions options)
     throws IOException {
       if (getMaster) {
         getMaster();
       }
       HRegionInterface server;
-      String rsName = regionServer.toString();
-      // See if we already have a connection (common case)
-      server = this.servers.get(rsName);
-      if (server == null) {
-        // create a unique lock for this RS (if necessary)
-        this.connectionLock.putIfAbsent(rsName, rsName);
-        // get the RS lock
-        synchronized (this.connectionLock.get(rsName)) {
-          // do one more lookup in case we were stalled above
-          server = this.servers.get(rsName);
-          if (server == null) {
-            try {
-              // definitely a cache miss. establish an RPC for this RS
-              // set hbase.ipc.client.connect.max.retries to retry connection
-              // attempts
-              server = (HRegionInterface) HBaseRPC.getProxy(
-                  serverInterfaceClass, HBaseRPCProtocolVersion.versionID,
-                  regionServer.getInetSocketAddress(), this.conf,
-                  this.rpcTimeout);
-              this.servers.put(rsName, server);
-            } catch (RemoteException e) {
-              throw RemoteExceptionHandler.decodeRemoteException(e);
-            }
-          }
-        }
+      
+      try {
+        // establish an RPC for this RS
+        // set hbase.ipc.client.connect.max.retries to retry connection
+        // attempts
+        server = (HRegionInterface) HBaseRPC.getProxy(
+            serverInterfaceClass, HBaseRPCProtocolVersion.versionID,
+            regionServer.getInetSocketAddress(), this.conf,
+            this.rpcTimeout, options);
+      } catch (RemoteException e) {
+        throw RemoteExceptionHandler.decodeRemoteException(e);
       }
+
       return server;
     }
 
     public HRegionInterface getHRegionConnection(
+        HServerAddress regionServer, HBaseRPCOptions options)
+    throws IOException {
+      return getHRegionConnection(regionServer, false, options);
+    }
+
+    public HRegionInterface getHRegionConnection(
+        HServerAddress regionServer, boolean getMaster)
+    throws IOException {
+      return getHRegionConnection(regionServer, getMaster, HBaseRPCOptions.DEFAULT);
+    }
+
+    public HRegionInterface getHRegionConnection(
         HServerAddress regionServer)
     throws IOException {
       return getHRegionConnection(regionServer, false);
@@ -1520,7 +1517,7 @@ public class HConnectionManager {
     }
 
     public int processBatchOfRows(final ArrayList<Put> list,
-      final byte[] tableName)
+      final byte[] tableName, final HBaseRPCOptions options)
     throws IOException {
       if (list.isEmpty()) return 0;
       Batch<Object> b = new Batch<Object>(this) {
@@ -1531,7 +1528,7 @@ public class HConnectionManager {
         throws IOException, RuntimeException {
           final List<Put> puts = (List<Put>)currentList;
           return getRegionServerWithRetries(new ServerCallable<Integer>(this.c,
-              tableName, row) {
+              tableName, row, options) {
             public Integer call() throws IOException {
               return server.put(location.getRegionInfo().getRegionName(), puts);
             }
@@ -1542,7 +1539,7 @@ public class HConnectionManager {
     }
 
     public Result[] processBatchOfGets(final List<Get> list,
-        final byte[] tableName)
+        final byte[] tableName, final HBaseRPCOptions options)
         throws IOException {
       if (list.isEmpty()) {
         return null;
@@ -1558,7 +1555,7 @@ public class HConnectionManager {
             RuntimeException {
           final List<Get> gets = (List<Get>) currentList;
           Result[] tmp = getRegionServerWithRetries(new ServerCallable<Result[]>(
-              this.c, tableName, row) {
+              this.c, tableName, row, options) {
             public Result[] call() throws IOException {
               return server.get(location.getRegionInfo().getRegionName(), gets);
             }
@@ -1575,7 +1572,7 @@ public class HConnectionManager {
     }
 
     public int processBatchOfRowMutations(final List<RowMutations> list,
-      final byte[] tableName)
+      final byte[] tableName, final HBaseRPCOptions options)
     throws IOException {
       if (list.isEmpty()) return 0;
       Batch<Object> b = new Batch<Object>(this) {
@@ -1586,7 +1583,7 @@ public class HConnectionManager {
         throws IOException, RuntimeException {
           final List<RowMutations> mutations = (List<RowMutations>)currentList;
           getRegionServerWithRetries(new ServerCallable<Void>(this.c,
-                tableName, row) {
+                tableName, row, options) {
               public Void call() throws IOException {
                 server.mutateRow(location.getRegionInfo().getRegionName(),
                   mutations);
@@ -1600,7 +1597,7 @@ public class HConnectionManager {
       }
 
     public int processBatchOfDeletes(final List<Delete> list,
-      final byte[] tableName)
+      final byte[] tableName, final HBaseRPCOptions options)
     throws IOException {
       if (list.isEmpty()) return 0;
       Batch<Object> b = new Batch<Object>(this) {
@@ -1611,7 +1608,7 @@ public class HConnectionManager {
         throws IOException, RuntimeException {
           final List<Delete> deletes = (List<Delete>)currentList;
           return getRegionServerWithRetries(new ServerCallable<Integer>(this.c,
-                tableName, row) {
+                tableName, row, options) {
               public Integer call() throws IOException {
                 return server.delete(location.getRegionInfo().getRegionName(),
                   deletes);
@@ -1622,23 +1619,11 @@ public class HConnectionManager {
       return b.process(list, tableName, new Object());
       }
 
-    void close(boolean stopProxy) {
+    public void close() {
       if (master != null) {
-        if (stopProxy) {
-          HBaseRPC.stopProxy(master);
-        }
         master = null;
         masterChecked = false;
       }
-      if (stopProxy) {
-        for (HRegionInterface i: servers.values()) {
-          HBaseRPC.stopProxy(i);
-        }
-      }
-    }
-
-    public void close() {
-      close(true);
     }
 
     /**
@@ -1650,7 +1635,7 @@ public class HConnectionManager {
      * @throws IOException
      */
     private Throwable processSinglePut(Put put,
-        List<Put> failed, final byte[] tableName) throws IOException {
+        List<Put> failed, final byte[] tableName, HBaseRPCOptions options) throws IOException {
       // XXX error handling should mirror getRegionServerWithRetries()
       // Get server address
       byte [] row = put.getRow();
@@ -1664,7 +1649,7 @@ public class HConnectionManager {
 
       // Create the multiPutCallable
       Callable<MultiPutResponse> multiPutCallable =
-        createPutCallable(multiPut.address, multiPut, tableName);
+        createPutCallable(multiPut.address, multiPut, tableName, options);
 
       try {
         // Get the MultiPutResponse
@@ -1700,7 +1685,7 @@ public class HConnectionManager {
      * @throws IOException
      */
     private void processBatchOfMultiPut(List<Put> list,
-        List<Put> failed, final byte[] tableName) throws IOException {
+        List<Put> failed, final byte[] tableName, HBaseRPCOptions options) throws IOException {
       // XXX error handling should mirror getRegionServerWithRetries()
       Collections.sort(list);
       Map<HServerAddress, MultiPut> regionPuts =
@@ -1731,7 +1716,7 @@ public class HConnectionManager {
           new ArrayList<Future<MultiPutResponse>>(regionPuts.size());
       for ( MultiPut put : multiPuts ) {
         futures.add(HTable.multiPutThreadPool.submit(
-            createPutCallable(put.address, put, tableName)));
+            createPutCallable(put.address, put, tableName, options)));
       }
 
       // step 3:
@@ -1776,13 +1761,13 @@ public class HConnectionManager {
     }
 
     /** {@inheritDoc} */
-    public List<Put> processSingleMultiPut(MultiPut mput) {
+    public List<Put> processSingleMultiPut(MultiPut mput, HBaseRPCOptions options) {
       if (mput == null || mput.address == null)
         return null;
       List<Put> failedPuts = null;
 
       Future<MultiPutResponse> future = HTable.multiPutThreadPool.submit(
-          createPutCallable(mput.address, mput, null));
+          createPutCallable(mput.address, mput, null, options));
 
       try {
         MultiPutResponse resp = future.get();
@@ -1831,16 +1816,16 @@ public class HConnectionManager {
      *  - Otherwise, we throw a generic exception indicating that an error occurred.
      *    The 'list' parameter is mutated to contain those puts that did not succeed.
      */
-    public void processBatchOfPuts(List<Put> list, final byte[] tableName)
+    public void processBatchOfPuts(List<Put> list, final byte[] tableName, HBaseRPCOptions options)
     throws IOException {
       boolean singletonList = list.size() == 1;
       Throwable singleRowCause = null;
       for ( int tries = 0 ; tries < numRetries && !list.isEmpty(); ++tries) {
         List<Put> failed = new ArrayList<Put>();
         if (singletonList) {
-          singleRowCause = this.processSinglePut(list.get(0), failed, tableName);
+          singleRowCause = this.processSinglePut(list.get(0), failed, tableName, options);
         } else {
-          this.processBatchOfMultiPut(list, failed, tableName);
+          this.processBatchOfMultiPut(list, failed, tableName, options);
         }
 
         list.clear();
@@ -1879,12 +1864,12 @@ public class HConnectionManager {
 
     private Callable<MultiPutResponse> createPutCallable(
         final HServerAddress address, final MultiPut puts,
-        final byte [] tableName) {
+        final byte [] tableName, final HBaseRPCOptions options) {
       final HConnection connection = this;
       return new Callable<MultiPutResponse>() {
         public MultiPutResponse call() throws IOException {
           return getRegionServerWithoutRetries(
-              new ServerCallable<MultiPutResponse>(connection, tableName, null) {
+              new ServerCallable<MultiPutResponse>(connection, tableName, null, options) {
                 public MultiPutResponse call() throws IOException {
                   MultiPutResponse resp = server.multiPut(puts);
                   resp.request = puts;
@@ -1897,7 +1882,7 @@ public class HConnectionManager {
                 }
                 @Override
                 public void instantiateServer() throws IOException {
-                  server = connection.getHRegionConnection(address);
+                  server = connection.getHRegionConnection(address, options);
                 }
               }
           );

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1356924&r1=1356923&r2=1356924&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java Tue Jul  3 20:41:02 2012
@@ -51,6 +51,9 @@ import org.apache.hadoop.hbase.NotServin
 import org.apache.hadoop.hbase.UnknownScannerException;
 import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.ipc.HBaseRPCOptions;
+import org.apache.hadoop.hbase.ipc.ProfilingData;
 import org.apache.hadoop.hbase.util.DaemonThreadFactory;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Writables;
@@ -77,6 +80,7 @@ public class HTable implements HTableInt
   private long currentWriteBufferSize;
   protected int scannerCaching;
   private int maxKeyValueSize;
+  private HBaseRPCOptions options;
 
   private long maxScannerResultSize;
 
@@ -160,7 +164,12 @@ public class HTable implements HTableInt
       HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
       HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
     this.maxKeyValueSize = conf.getInt("hbase.client.keyvalue.maxsize", -1);
-
+    this.options = new HBaseRPCOptions ();
+    String compressionAlgo = conf.get(HConstants.HBASE_RPC_COMPRESSION_KEY);
+    if (compressionAlgo != null) {
+      this.options.setRPCCompression(
+          Compression.getCompressionAlgorithmByName(compressionAlgo));
+    }
   }
 
   public Configuration getConfiguration() {
@@ -499,7 +508,7 @@ public class HTable implements HTableInt
    public Result getRowOrBefore(final byte[] row, final byte[] family)
    throws IOException {
      return connection.getRegionServerWithRetries(
-         new ServerCallable<Result>(connection, tableName, row) {
+         new ServerCallable<Result>(connection, tableName, row, this.options) {
        public Result call() throws IOException {
          return server.getClosestRowBefore(location.getRegionInfo().getRegionName(),
            row, family);
@@ -528,7 +537,7 @@ public class HTable implements HTableInt
 
   public Result get(final Get get) throws IOException {
     return connection.getRegionServerWithRetries(
-        new ServerCallable<Result>(connection, tableName, get.getRow()) {
+        new ServerCallable<Result>(connection, tableName, get.getRow(), this.options) {
           public Result call() throws IOException {
             return server.get(location.getRegionInfo().getRegionName(), get);
           }
@@ -537,13 +546,17 @@ public class HTable implements HTableInt
   }
 
   public Result[] get(List<Get> gets) throws IOException {
-    return connection.processBatchOfGets(gets, tableName);
+    return connection.processBatchOfGets(gets, tableName, this.options);
+  }
+
+  public ProfilingData getProfilingData () {
+    return this.options.profilingResult;
   }
 
   public void delete(final Delete delete)
   throws IOException {
     connection.getRegionServerWithRetries(
-        new ServerCallable<Boolean>(connection, tableName, delete.getRow()) {
+        new ServerCallable<Boolean>(connection, tableName, delete.getRow(), this.options) {
           public Boolean call() throws IOException {
             server.delete(location.getRegionInfo().getRegionName(), delete);
             return null; // FindBugs NP_BOOLEAN_RETURN_NULL
@@ -556,7 +569,7 @@ public class HTable implements HTableInt
   throws IOException {
     int last = 0;
     try {
-      last = connection.processBatchOfDeletes(deletes, this.tableName);
+      last = connection.processBatchOfDeletes(deletes, this.tableName, this.options);
     } finally {
       deletes.subList(0, last).clear();
     }
@@ -602,7 +615,7 @@ public class HTable implements HTableInt
           "Invalid arguments to incrementColumnValue", npe);
     }
     return connection.getRegionServerWithRetries(
-        new ServerCallable<Long>(connection, tableName, row) {
+        new ServerCallable<Long>(connection, tableName, row, this.options) {
           public Long call() throws IOException {
             return server.incrementColumnValue(
                 location.getRegionInfo().getRegionName(), row, family,
@@ -630,7 +643,7 @@ public class HTable implements HTableInt
       final Put put)
   throws IOException {
     return connection.getRegionServerWithRetries(
-        new ServerCallable<Boolean>(connection, tableName, row) {
+        new ServerCallable<Boolean>(connection, tableName, row, this.options) {
           public Boolean call() throws IOException {
             return server.checkAndPut(location.getRegionInfo().getRegionName(),
                 row, family, qualifier, value, put) ? Boolean.TRUE : Boolean.FALSE;
@@ -657,7 +670,7 @@ public class HTable implements HTableInt
       final Delete delete)
   throws IOException {
     return connection.getRegionServerWithRetries(
-        new ServerCallable<Boolean>(connection, tableName, row) {
+        new ServerCallable<Boolean>(connection, tableName, row, this.options) {
           public Boolean call() throws IOException {
             return server.checkAndDelete(
                 location.getRegionInfo().getRegionName(),
@@ -674,7 +687,7 @@ public class HTable implements HTableInt
   @Override
   public void mutateRow(final RowMutations arm) throws IOException {
     connection.getRegionServerWithRetries(
-	    new ServerCallable<Void>(connection, tableName, arm.getRow()) {
+	    new ServerCallable<Void>(connection, tableName, arm.getRow(), this.options) {
 	      public Void call() throws IOException {
 	        server.mutateRow(location.getRegionInfo().getRegionName(), arm);
 	        return null;
@@ -687,7 +700,7 @@ public class HTable implements HTableInt
    */
   @Override
   public void mutateRow(final List<RowMutations> armList) throws IOException {
-	  connection.processBatchOfRowMutations(armList, this.tableName);
+	  connection.processBatchOfRowMutations(armList, this.tableName, this.options);
   }
 
   /**
@@ -703,7 +716,7 @@ public class HTable implements HTableInt
    */
   public boolean exists(final Get get) throws IOException {
     return connection.getRegionServerWithRetries(
-        new ServerCallable<Boolean>(connection, tableName, get.getRow()) {
+        new ServerCallable<Boolean>(connection, tableName, get.getRow(), this.options) {
           public Boolean call() throws IOException {
             return server.
                 exists(location.getRegionInfo().getRegionName(), get);
@@ -714,7 +727,7 @@ public class HTable implements HTableInt
 
   public void flushCommits() throws IOException {
     try {
-      connection.processBatchOfPuts(writeBuffer, tableName);
+      connection.processBatchOfPuts(writeBuffer, tableName, this.options);
     } finally {
       if (clearBufferOnFail) {
         writeBuffer.clear();
@@ -752,7 +765,7 @@ public class HTable implements HTableInt
   public RowLock lockRow(final byte [] row)
   throws IOException {
     return connection.getRegionServerWithRetries(
-      new ServerCallable<RowLock>(connection, tableName, row) {
+      new ServerCallable<RowLock>(connection, tableName, row, this.options) {
         public RowLock call() throws IOException {
           long lockId =
               server.lockRow(location.getRegionInfo().getRegionName(), row);
@@ -765,7 +778,7 @@ public class HTable implements HTableInt
   public void unlockRow(final RowLock rl)
   throws IOException {
     connection.getRegionServerWithRetries(
-      new ServerCallable<Boolean>(connection, tableName, rl.getRow()) {
+      new ServerCallable<Boolean>(connection, tableName, rl.getRow(), this.options) {
         public Boolean call() throws IOException {
           server.unlockRow(location.getRegionInfo().getRegionName(),
               rl.getLockId());
@@ -974,7 +987,7 @@ public class HTable implements HTableInt
           Bytes.toStringBinary(localStartKey) + "'");
       }
       try {
-        callable = getScannerCallable(localStartKey, nbRows);
+        callable = getScannerCallable(localStartKey, nbRows, options);
         // Open a scanner on the region server starting at the
         // beginning of the region
         getConnection().getRegionServerWithRetries(callable);
@@ -987,10 +1000,10 @@ public class HTable implements HTableInt
     }
 
     protected ScannerCallable getScannerCallable(byte [] localStartKey,
-        int nbRows) {
+        int nbRows, HBaseRPCOptions options) {
       scan.setStartRow(localStartKey);
       ScannerCallable s = new ScannerCallable(getConnection(),
-        getTableName(), scan);
+        getTableName(), scan, options);
       s.setCaching(nbRows);
       return s;
     }
@@ -1211,4 +1224,24 @@ public class HTable implements HTableInt
     return HConnectionManager.getConnection(HBaseConfiguration.create()).
     getRegionCachePrefetch(tableName);
   }
+
+  @Override
+  public void setProfiling(boolean prof) {
+    options.setRequestProfiling (prof);
+  }
+
+  @Override
+  public boolean getProfiling() {
+    return options.getRequestProfiling ();
+  }
+
+  @Override
+  public void setTag (String tag) {
+    this.options.setTag (tag);
+  }
+
+  @Override
+  public String getTag () {
+    return this.options.getTag ();
+  }
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java?rev=1356924&r1=1356923&r2=1356924&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java Tue Jul  3 20:41:02 2012
@@ -323,4 +323,31 @@ public interface HTableInterface {
    */
   void unlockRow(RowLock rl) throws IOException;
 
+  /**
+   * Set profiling on/off
+   *
+   * @param prof true for on, false for off.
+   */
+  void setProfiling(boolean prof);
+
+  /**
+   * Is profiling on/off?
+   *
+   * @return A boolean, true if profiling is on.
+   */
+  boolean getProfiling();
+
+  /**
+   * Set application tag, null for no tag
+   *
+   * @param tag the tag to set to
+   */
+  void setTag(String tag);
+
+  /**
+   * get current tag
+   *
+   * @return A string, the current application tag
+   */
+  String getTag();
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java?rev=1356924&r1=1356923&r2=1356924&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java Tue Jul  3 20:41:02 2012
@@ -37,6 +37,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.ipc.HBaseRPCOptions;
 
 /**
  * HTableMultiplexer provides a thread-safe non blocking PUT API across all the tables.
@@ -95,8 +96,9 @@ public class HTableMultiplexer {
    * @return true if the request can be accepted by its corresponding buffer queue.
    * @throws IOException
    */
-  public boolean put(final byte[] table, final Put put) throws IOException {
-    return put(table, put, this.retryNum);
+  public boolean put(final byte[] table, final Put put,
+      HBaseRPCOptions options) throws IOException {
+    return put(table, put, this.retryNum, options);
   }
 
   /**
@@ -107,14 +109,15 @@ public class HTableMultiplexer {
    * @return the list of puts which could not be queued
    * @throws IOException
    */
-  public List<Put> put(final byte[] table, final List<Put> puts) throws IOException {
+  public List<Put> put(final byte[] table, final List<Put> puts,
+      HBaseRPCOptions options) throws IOException {
     if (puts == null)
       return null;
     
     List <Put> failedPuts = null;
     boolean result;
     for (Put put : puts) {
-      result = put(table, put, this.retryNum);
+      result = put(table, put, this.retryNum, options);
       if (result == false) {
         
         // Create the failed puts list if necessary
@@ -138,7 +141,8 @@ public class HTableMultiplexer {
    * @return true if the request can be accepted by its corresponding buffer queue.
    * @throws IOException
    */
-  public boolean put(final byte[] table, final Put put, int retry) throws IOException {
+  public boolean put(final byte[] table, final Put put, int retry,
+      HBaseRPCOptions options) throws IOException {
     if (retry <= 0) {
       return false;
     }
@@ -154,7 +158,7 @@ public class HTableMultiplexer {
         // Add the put pair into its corresponding queue.
         queue = getBufferedQueue(addr);
         // Generate a MultiPutStatus obj and offer it into the queue
-        PutStatus s = new PutStatus(loc.getRegionInfo(), put, retry);
+        PutStatus s = new PutStatus(loc.getRegionInfo(), put, retry, options);
         
         return queue.offer(s);
       }
@@ -285,11 +289,13 @@ public class HTableMultiplexer {
     private final HRegionInfo regionInfo;
     private final Put put;
     private final int retryCount;
+    private final HBaseRPCOptions options;
     public PutStatus(final HRegionInfo regionInfo, final Put put,
-        final int retryCount) {
+        final int retryCount, final HBaseRPCOptions options) {
       this.regionInfo = regionInfo;
       this.put = put;
       this.retryCount = retryCount;
+      this.options = options;
     }
 
     public HRegionInfo getRegionInfo() {
@@ -301,6 +307,9 @@ public class HTableMultiplexer {
     public int getRetryCount() {
       return retryCount;
     }
+    public HBaseRPCOptions getOptions () {
+      return options;
+    }
   }
 
   private static class HTableFlushWorker implements Runnable {
@@ -346,7 +355,8 @@ public class HTableMultiplexer {
         return false;
       } else {
         // Retry one more time
-        return this.htableMultiplexer.put(tableName, failedPut, retryCount);
+        HBaseRPCOptions options = failedPutStatus.getOptions ();
+        return this.htableMultiplexer.put(tableName, failedPut, retryCount, options);
       }
     }
 
@@ -382,15 +392,18 @@ public class HTableMultiplexer {
           if (processingList.size() > 0) {
             // Create the MultiPut object
             MultiPut mput = new MultiPut(this.addr);
+            HBaseRPCOptions options = null;
             for (PutStatus putStatus: processingList) {
               // Update the MultiPut
               mput.add(putStatus.getRegionInfo().getRegionName(), 
                   putStatus.getPut());
+              if (putStatus.getOptions () != null) {
+                options = putStatus.getOptions ();
+              }
             }
             
             // Process this multiput request
-            List<Put> failed = connection.processSingleMultiPut(mput);
-            
+            List<Put> failed = connection.processSingleMultiPut(mput, options);
             if (failed != null) {
               if (failed.size() == processingList.size()) {
                 // All the puts for this region server are failed. Going to retry it later
@@ -434,4 +447,4 @@ public class HTableMultiplexer {
       }
     }
   }
-}
\ No newline at end of file
+}

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java?rev=1356924&r1=1356923&r2=1356924&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java Tue Jul  3 20:41:02 2012
@@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.HRegionIn
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.hbase.ipc.HBaseRPCOptions;
 
 import java.io.IOException;
 
@@ -155,7 +156,7 @@ public class MetaScanner {
     do {
       final Scan scan = new Scan(startRow).addFamily(HConstants.CATALOG_FAMILY);
       callable = new ScannerCallable(connection, metaTableName,
-          scan);
+          scan, HBaseRPCOptions.DEFAULT);
       // Open scanner
       connection.getRegionServerWithRetries(callable);
 

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java?rev=1356924&r1=1356923&r2=1356924&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java Tue Jul  3 20:41:02 2012
@@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.DoNotRetr
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.ipc.HBaseRPCOptions;
 import org.apache.hadoop.ipc.RemoteException;
 import org.mortbay.log.Log;
 
@@ -46,8 +47,9 @@ public class ScannerCallable extends Ser
    * @param tableName table callable is on
    * @param scan the scan to execute
    */
-  public ScannerCallable (HConnection connection, byte [] tableName, Scan scan) {
-    super(connection, tableName, scan.getStartRow());
+  public ScannerCallable (HConnection connection, byte [] tableName, 
+      Scan scan, HBaseRPCOptions options) {
+    super(connection, tableName, scan.getStartRow(), options);
     this.scan = scan;
   }
 

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java?rev=1356924&r1=1356923&r2=1356924&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java Tue Jul  3 20:41:02 2012
@@ -21,6 +21,8 @@
 package org.apache.hadoop.hbase.client;
 
 import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ipc.HBaseRPC;
+import org.apache.hadoop.hbase.ipc.HBaseRPCOptions;
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
 
 import java.io.IOException;
@@ -36,6 +38,7 @@ public abstract class ServerCallable<T> 
   protected final byte [] row;
   protected HRegionLocation location;
   protected HRegionInterface server;
+  protected HBaseRPCOptions options;
 
   /**
    * @param connection connection callable is on
@@ -43,14 +46,26 @@ public abstract class ServerCallable<T> 
    * @param row row we are querying
    */
   public ServerCallable(HConnection connection, byte [] tableName, byte [] row) {
+    this (connection, tableName, row, HBaseRPCOptions.DEFAULT);
+  }
+
+
+  /**
+   * @param connection connection callable is on
+   * @param tableName table name callable is on
+   * @param row row we are querying
+   * @param options client options for ipc layer
+   */
+  public ServerCallable(HConnection connection, byte [] tableName, byte [] row, 
+      HBaseRPCOptions options) {
     this.connection = connection;
     this.tableName = tableName;
     this.row = row;
+    this.options = options;
   }
 
-
   /**
-   * 
+   *
    * @param reload set this to true if connection should re-find the region
    * @throws IOException
    */
@@ -63,7 +78,7 @@ public abstract class ServerCallable<T> 
    * @throws IOException e
    */
   public void instantiateServer() throws IOException {
-    this.server = connection.getHRegionConnection(location.getServerAddress());
+    this.server = connection.getHRegionConnection(location.getServerAddress(), this.options);
   }
 
   /** @return the server name */
@@ -86,4 +101,4 @@ public abstract class ServerCallable<T> 
   public byte [] getRow() {
     return row;
   }
-}
\ No newline at end of file
+}

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java?rev=1356924&r1=1356923&r2=1356924&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java Tue Jul  3 20:41:02 2012
@@ -68,6 +68,8 @@ import org.apache.hadoop.hbase.filter.Sk
 import org.apache.hadoop.hbase.filter.ValueFilter;
 import org.apache.hadoop.hbase.filter.WhileMatchFilter;
 import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
+import org.apache.hadoop.hbase.ipc.HBaseRPCOptions;
+import org.apache.hadoop.hbase.ipc.ProfilingData;
 import org.apache.hadoop.hbase.master.AssignmentPlan;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
@@ -109,6 +111,10 @@ public class HbaseObjectWritable impleme
   // sending of the class name using reflection, etc.
   private static final byte NOT_ENCODED = 0;
   static {
+    
+    // Add new objects to the end of the list to preserve
+    // old protocol numbers
+    
     byte code = NOT_ENCODED + 1;
     // Primitive types.
     addToMap(Boolean.TYPE, code++);
@@ -160,7 +166,6 @@ public class HbaseObjectWritable impleme
     addToMap(Result.class, code++);
     addToMap(Result[].class, code++);
     addToMap(Scan.class, code++);
-
     addToMap(WhileMatchFilter.class, code++);
     addToMap(PrefixFilter.class, code++);
     addToMap(PageFilter.class, code++);
@@ -202,6 +207,9 @@ public class HbaseObjectWritable impleme
     addToMap(AssignmentPlan.class, code++);
 
     addToMap(RowMutations.class, code++);
+    
+    addToMap(HBaseRPCOptions.class, code++);
+    addToMap(ProfilingData.class, code++);
   }
 
   private Class<?> declaredClass;

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java?rev=1356924&r1=1356923&r2=1356924&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java Tue Jul  3 20:41:02 2012
@@ -94,7 +94,6 @@ public class HBaseClient {
   private final int connectionTimeOutMillSec; // the connection time out
 
   protected final SocketFactory socketFactory;           // how to create sockets
-  private int refCount = 1;
 
   final private static String PING_INTERVAL_NAME = "ipc.ping.interval";
   final static int DEFAULT_PING_INTERVAL = 60000; // 1 min
@@ -122,31 +121,6 @@ public class HBaseClient {
     return conf.getInt(PING_INTERVAL_NAME, DEFAULT_PING_INTERVAL);
   }
 
-  /**
-   * Increment this client's reference count
-   *
-   */
-  synchronized void incCount() {
-    refCount++;
-  }
-
-  /**
-   * Decrement this client's reference count
-   *
-   */
-  synchronized void decCount() {
-    refCount--;
-  }
-
-  /**
-   * Return if this client has no reference
-   *
-   * @return true if this client has no reference; false otherwise
-   */
-  synchronized boolean isZeroReference() {
-    return refCount==0;
-  }
-
   /** A call waiting for a value. */
   private class Call {
     final int id;                                       // call id
@@ -154,9 +128,8 @@ public class HBaseClient {
     Writable value;                               // value, null if error
     IOException error;                            // exception, null if value
     boolean done;                                 // true when call is done
-    protected Compression.Algorithm compressionAlgo =
-      Compression.Algorithm.NONE;
     protected int version = HBaseServer.CURRENT_VERSION;
+    public HBaseRPCOptions options;
 
     protected Call(Writable param) {
       this.param = param;
@@ -199,14 +172,6 @@ public class HBaseClient {
     public int getVersion() {
       return version;
     }
-
-    public void setRPCCompression(Compression.Algorithm compressionAlgo) {
-      this.compressionAlgo = compressionAlgo;
-    }
-
-    public Compression.Algorithm getRPCCompression() {
-      return this.compressionAlgo;
-    }
   }
 
   /** Thread that reads responses and notifies callers.  Each connection owns a
@@ -520,7 +485,6 @@ public class HBaseClient {
       if (shouldCloseConnection.get()) {
         return;
       }
-
       DataOutputStream uncompressedOS = null;
       DataOutputStream outOS = null;
       try {
@@ -535,24 +499,23 @@ public class HBaseClient {
           try {
             // 1. write the call id uncompressed
             uncompressedOS.writeInt(call.id);
+         
+            // 2. write RPC options uncompressed
+            if (call.version >= HBaseServer.VERSION_RPCOPTIONS) {
+              call.options.write(outOS);
+            }
 
             // preserve backwards compatibility
-            if (call.getRPCCompression() != Compression.Algorithm.NONE) {
-              // 2. write the compression algo used to compress the request being sent
-              uncompressedOS.writeUTF(call.getRPCCompression().getName());
-              
-              // 3. write the compression algo to use for the response
-              uncompressedOS.writeUTF(call.getRPCCompression().getName());
-              
-              // 4. setup the compressor
-              Compressor compressor = call.getRPCCompression().getCompressor();
+            if (call.options.getRPCCompression() != Compression.Algorithm.NONE) {
+              // 3. setup the compressor
+              Compressor compressor = call.options.getRPCCompression().getCompressor();
               OutputStream compressedOutputStream =
-                  call.getRPCCompression().createCompressionStream(
-                      uncompressedOS, compressor, 0);
+                call.options.getRPCCompression().createCompressionStream(
+                  uncompressedOS, compressor, 0);
               outOS = new DataOutputStream(compressedOutputStream);
             }
 
-            // 5. write the output params with the correct compression type
+            // 4. write the output params with the correct compression type
             call.param.write(outOS);
             outOS.flush();
             baos.flush();
@@ -595,7 +558,6 @@ public class HBaseClient {
         return;
       }
       touch();
-
       try {
         DataInputStream localIn = in;
 
@@ -604,11 +566,10 @@ public class HBaseClient {
         if (LOG.isDebugEnabled())
           LOG.debug(getName() + " got value #" + id);
         Call call = calls.get(id);
-
         // 2. read the error boolean uncompressed
         boolean isError = localIn.readBoolean();
 
-        if (call.getVersion() >= HBaseServer.VERSION_COMPRESSED_RPC) {
+        if (call.getVersion() >= HBaseServer.VERSION_RPCOPTIONS) {
           // 3. read the compression type used for the rest of the response
           String compressionAlgoName = localIn.readUTF();
           Compression.Algorithm rpcCompression =
@@ -622,7 +583,6 @@ public class HBaseClient {
             localIn = new DataInputStream(is);
           }
         }
-
         // 5. read the rest of the value
         if (isError) {
           //noinspection ThrowableInstanceNeverThrown
@@ -632,6 +592,14 @@ public class HBaseClient {
         } else {
           Writable value = ReflectionUtils.newInstance(valueClass, conf);
           value.readFields(localIn);                 // read value
+          call.options.profilingResult = null;  // clear out previous results
+          if (call.getVersion() >= HBaseServer.VERSION_RPCOPTIONS) {
+            boolean hasProfiling = localIn.readBoolean ();
+            if (hasProfiling) {
+              call.options.profilingResult = new ProfilingData ();
+              call.options.profilingResult.readFields(localIn);
+            }
+          }
           call.setValue(value);
           calls.remove(id);
         }
@@ -824,17 +792,17 @@ public class HBaseClient {
    * @return Writable
    * @throws IOException e
    */
-  public Writable call(Writable param, InetSocketAddress address)
+  public Writable call(Writable param, InetSocketAddress address, HBaseRPCOptions options)
   throws IOException {
-      return call(param, address, null, 0, Compression.Algorithm.NONE);
+      return call(param, address, null, 0, options);
   }
 
   public Writable call(Writable param, InetSocketAddress addr,
                        UserGroupInformation ticket, int rpcTimeout,
-                       Compression.Algorithm rpcCompression)
+                       HBaseRPCOptions options)
                        throws IOException {
     Call call = new Call(param);
-    call.setRPCCompression(rpcCompression);
+    call.options = options;
     Connection connection = getConnection(addr, ticket, rpcTimeout, call);
     connection.sendParam(call);                 // send the parameter
     boolean interrupted = false;
@@ -907,7 +875,7 @@ public class HBaseClient {
    * @throws IOException e
    */
   public Writable[] call(Writable[] params, InetSocketAddress[] addresses,
-      Compression.Algorithm rpcCompression)
+      HBaseRPCOptions options)
     throws IOException {
     if (addresses.length == 0) return new Writable[0];
 
@@ -917,7 +885,7 @@ public class HBaseClient {
     synchronized (results) {
       for (int i = 0; i < params.length; i++) {
         ParallelCall call = new ParallelCall(params[i], results, i);
-        call.setRPCCompression(rpcCompression);
+        call.options = options;
         try {
           Connection connection = getConnection(addresses[i], null, 0, call);
           connection.sendParam(call);             // send each parameter
@@ -951,11 +919,12 @@ public class HBaseClient {
     }
     // RPC compression is only supported from version 4, so make backward compatible
     byte version = HBaseServer.CURRENT_VERSION;
-    if (call.getRPCCompression() == Compression.Algorithm.NONE) {
+    if (call.options.getRPCCompression() == Compression.Algorithm.NONE
+        && !call.options.getRequestProfiling ()
+        && call.options.getTag () == null) {
       version = HBaseServer.VERSION_3;
     }
     call.setVersion(version);
-
     Connection connection;
     /* we could avoid this allocation for each RPC by having a
      * connectionsId object and with set() method. We need to manage the

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java?rev=1356924&r1=1356923&r2=1356924&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java Tue Jul  3 20:41:02 2012
@@ -57,6 +57,8 @@ import java.net.InetSocketAddress;
 import java.net.SocketTimeoutException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /** A simple RPC mechanism.
  *
@@ -85,6 +87,9 @@ import java.util.Map;
  */
 public class HBaseRPC {
   protected static final Log LOG = LogFactory.getLog(HBaseRPC.class.getName());
+  
+  private final static Map<InetSocketAddress, Long> versions =
+      new ConcurrentHashMap<InetSocketAddress, Long>();
 
   private HBaseRPC() {
     super();
@@ -202,8 +207,6 @@ public class HBaseRPC {
         // Make an hbase client instead of hadoop Client.
         client = new HBaseClient(HbaseObjectWritable.class, conf, factory);
         clients.put(factory, client);
-      } else {
-        client.incCount();
       }
       return client;
     }
@@ -221,18 +224,18 @@ public class HBaseRPC {
 
     /**
      * Stop a RPC client connection
-     * A RPC client is closed only when its reference count becomes zero.
      * @param client client to stop
      */
     protected void stopClient(HBaseClient client) {
       synchronized (this) {
-        client.decCount();
-        if (client.isZeroReference()) {
-          clients.remove(client.getSocketFactory());
-        }
+        clients.remove(client.getSocketFactory());
       }
-      if (client.isZeroReference()) {
-        client.stop();
+      client.stop();
+    }
+    
+    protected void stopClients () {
+      for (Map.Entry<SocketFactory, HBaseClient> e : clients.entrySet()) {
+        this.stopClient (e.getValue ());
       }
     }
   }
@@ -245,8 +248,7 @@ public class HBaseRPC {
     private HBaseClient client;
     private boolean isClosed = false;
     final private int rpcTimeout;
-    private Compression.Algorithm rpcCompression =
-      HConstants.DEFAULT_HBASE_RPC_COMPRESSION;
+    public HBaseRPCOptions options;
 
     /**
      * @param address address for invoker
@@ -255,16 +257,13 @@ public class HBaseRPC {
      * @param factory socket factory
      */
     public Invoker(InetSocketAddress address, UserGroupInformation ticket,
-                   Configuration conf, SocketFactory factory, int rpcTimeout) {
+                   Configuration conf, SocketFactory factory, 
+                   int rpcTimeout, HBaseRPCOptions options) {
       this.address = address;
       this.ticket = ticket;
       this.client = CLIENTS.getClient(conf, factory);
       this.rpcTimeout = rpcTimeout;
-      String compressionAlgo = conf.get(HConstants.HBASE_RPC_COMPRESSION_KEY);
-      if (compressionAlgo != null) {
-        rpcCompression =
-          Compression.getCompressionAlgorithmByName(compressionAlgo);
-      }
+      this.options = options;
     }
 
     public Object invoke(Object proxy, Method method, Object[] args)
@@ -276,7 +275,7 @@ public class HBaseRPC {
       }
       HbaseObjectWritable value = (HbaseObjectWritable)
         client.call(new Invocation(method, args), address, ticket,
-            rpcTimeout, rpcCompression);
+            rpcTimeout, options);
       if (isTraceEnabled) {
         long callTime = System.currentTimeMillis() - startTime;
         LOG.trace("Call: " + method.getName() + " " + callTime);
@@ -292,6 +291,10 @@ public class HBaseRPC {
       }
     }
   }
+  
+  public static void stopClients () {
+    CLIENTS.stopClients();
+  }
 
   /**
    * A version mismatch for the RPC protocol.
@@ -356,9 +359,9 @@ public class HBaseRPC {
    */
   public static VersionedProtocol getProxy(Class<?> protocol,
       long clientVersion, InetSocketAddress addr, Configuration conf,
-      SocketFactory factory, int rpcTimeout) throws IOException {
+      SocketFactory factory, int rpcTimeout, HBaseRPCOptions options) throws IOException {
     return getProxy(protocol, clientVersion, addr, null, conf, factory,
-        rpcTimeout);
+        rpcTimeout, options);
   }
 
   /**
@@ -377,19 +380,24 @@ public class HBaseRPC {
    */
   public static VersionedProtocol getProxy(Class<?> protocol,
       long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,
-      Configuration conf, SocketFactory factory, int rpcTimeout)
+      Configuration conf, SocketFactory factory, int rpcTimeout, HBaseRPCOptions options)
   throws IOException {
     VersionedProtocol proxy =
         (VersionedProtocol) Proxy.newProxyInstance(
             protocol.getClassLoader(), new Class[]{protocol},
-            new Invoker(addr, ticket, conf, factory, rpcTimeout));
-    long serverVersion = proxy.getProtocolVersion(protocol.getName(),
-                                                  clientVersion);
-    if (serverVersion == clientVersion) {
+            new Invoker(addr, ticket, conf, factory, rpcTimeout, options));
+    
+    Long serverVersion = versions.get (addr);
+    if (serverVersion == null) {
+      serverVersion = proxy.getProtocolVersion(protocol.getName(), clientVersion);
+      versions.put (addr, serverVersion);
+    }
+    
+    if ((long) serverVersion == clientVersion) {
       return proxy;
     }
     throw new VersionMismatch(protocol.getName(), clientVersion,
-                              serverVersion);
+                              (long) serverVersion);
   }
 
   /**
@@ -405,11 +413,21 @@ public class HBaseRPC {
    */
   public static VersionedProtocol getProxy(Class<?> protocol,
       long clientVersion, InetSocketAddress addr, Configuration conf,
-      int rpcTimeout)
+      int rpcTimeout, HBaseRPCOptions options)
       throws IOException {
 
     return getProxy(protocol, clientVersion, addr, conf, NetUtils
-        .getDefaultSocketFactory(conf), rpcTimeout);
+        .getDefaultSocketFactory(conf), rpcTimeout, options);
+  }
+  
+  /* this is needed for unit tests. some tests start multiple
+   * region servers using a single HBaseClient object. we need to
+   * reference count so the first region server to shut down
+   * doesn't shut down the HBaseClient object
+   */
+  static AtomicInteger numProxies = new AtomicInteger (0);
+  public static void startProxy () {
+    numProxies.incrementAndGet();
   }
 
   /**
@@ -417,55 +435,13 @@ public class HBaseRPC {
    * @param proxy the proxy to be stopped
    */
   public static void stopProxy(VersionedProtocol proxy) {
-    if (proxy!=null) {
+    numProxies.decrementAndGet();
+    if (proxy!=null && numProxies.get () <= 0) {
       ((Invoker)Proxy.getInvocationHandler(proxy)).close();
     }
   }
 
   /**
-   * Expert: Make multiple, parallel calls to a set of servers.
-   *
-   * @param method method to invoke
-   * @param params array of parameters
-   * @param addrs array of addresses
-   * @param conf configuration
-   * @return values
-   * @throws IOException e
-   */
-  public static Object[] call(Method method, Object[][] params,
-                              InetSocketAddress[] addrs, Configuration conf)
-    throws IOException {
-
-    Invocation[] invocations = new Invocation[params.length];
-    for (int i = 0; i < params.length; i++)
-      invocations[i] = new Invocation(method, params[i]);
-    HBaseClient client = CLIENTS.getClient(conf);
-    Compression.Algorithm rpcCompression = Compression.Algorithm.NONE;
-    String compressionAlgo = conf.get(HConstants.HBASE_RPC_COMPRESSION_KEY);
-    if (compressionAlgo != null) {
-      rpcCompression =
-          Compression.getCompressionAlgorithmByName(compressionAlgo);
-    }
-    try {
-    Writable[] wrappedValues = client.call(invocations, addrs, rpcCompression);
-
-    if (method.getReturnType() == Void.TYPE) {
-      return null;
-    }
-
-    Object[] values =
-      (Object[])Array.newInstance(method.getReturnType(), wrappedValues.length);
-    for (int i = 0; i < values.length; i++)
-      if (wrappedValues[i] != null)
-        values[i] = ((HbaseObjectWritable)wrappedValues[i]).get();
-
-    return values;
-    } finally {
-      CLIENTS.stopClient(client);
-    }
-  }
-
-  /**
    * Construct a server for a protocol implementation instance listening on a
    * port and address.
    *

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPCOptions.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPCOptions.java?rev=1356924&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPCOptions.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPCOptions.java Tue Jul  3 20:41:02 2012
@@ -0,0 +1,112 @@
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.ipc.HBaseRPC.VersionMismatch;
+import org.apache.hadoop.io.Writable;
+
+public class HBaseRPCOptions implements Writable {
+  
+  public static final HBaseRPCOptions DEFAULT = new HBaseRPCOptions ();
+  
+  private static final byte VERSION_INITIAL = 1;
+	
+  private byte version = VERSION_INITIAL;
+  private Compression.Algorithm compressionAlgo = Compression.Algorithm.NONE;
+	private boolean requestProfiling = false;
+	private String tag = null;
+	
+	// this will be used as profiling data in htable so it's possible to
+	// set it after receiving profiling data. do not need to serialize this.
+	public ProfilingData profilingResult = new ProfilingData ();
+	
+	public HBaseRPCOptions () {}
+	
+	public void setVersion (byte version) {
+	  this.version = version;
+	}
+	
+	public byte getVersion () {
+    return this.version;
+  }
+	
+	public void setRPCCompression(Compression.Algorithm compressionAlgo) {
+    this.compressionAlgo = compressionAlgo;
+  }
+
+  public Compression.Algorithm getRPCCompression() {
+    return this.compressionAlgo;
+  }
+	
+  /**
+   * set whether to request profiling data form the server
+   *
+   * @param request request profiling or not
+   */
+	public void setRequestProfiling (boolean request) {
+    this.requestProfiling = request;
+  }
+  
+  public boolean getRequestProfiling () {
+    return this.requestProfiling;
+  }
+  
+  /**
+   * set the tag of this rpc call. The server will aggregate stats
+   * based on the tag of the rpc call. typically this is unique to the
+   * application making the call
+   *
+   * @param tag RPC tag of the call
+   */
+  public void setTag (String tag) {
+    this.tag = tag;
+  }
+  
+  public String getTag () {
+    return this.tag;
+  }
+	
+	@Override
+  public void write(DataOutput out) throws IOException {
+	  // 1. write the object version
+	  out.writeByte(this.version);
+	  
+	  // 2. write the compression algo used to compress the request being sent
+    out.writeUTF(this.compressionAlgo.getName());
+    
+    // 3. write the compression algo to use for the response
+    out.writeUTF(this.compressionAlgo.getName());
+    
+    // 4. write profiling request flag
+	  out.writeBoolean(this.requestProfiling);
+	  
+	  // 5. write tag flag and tag if flag is true
+	  out.writeBoolean(this.tag != null ? true : false);
+	  if (this.tag != null) {
+	    out.writeUTF(this.tag);
+	  }
+  }
+    
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    this.version = in.readByte ();
+    if (this.version > VERSION_INITIAL) {
+      // this version is not handled!
+      throw new VersionMismatch("HBaseRPCOptions", this.version,
+          VERSION_INITIAL);
+    }
+    String compressionName;
+    compressionName = in.readUTF ();
+    compressionName = in.readUTF ();          // dummy read
+    this.compressionAlgo = Compression.
+        getCompressionAlgorithmByName(compressionName);
+    this.requestProfiling = in.readBoolean();
+    this.tag = null;
+    if (in.readBoolean()) {
+      this.tag = in.readUTF ();
+    }
+  }
+}

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=1356924&r1=1356923&r2=1356924&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Tue Jul  3 20:41:02 2012
@@ -96,10 +96,11 @@ public abstract class HBaseServer {
   // 1 : Introduce ping and server does not throw away RPCs
   // 3 : RPC was refactored in 0.19
   public static final byte VERSION_3 = 3;
-  // 4 : includes support for compression on RPCs
-  public static final byte VERSION_COMPRESSED_RPC = 4;
+  // 4 : RPC options object in RPC protocol with compression,
+  //     profiling, and tagging
+  public static final byte VERSION_RPCOPTIONS = 4;
 
-  public static final byte CURRENT_VERSION = VERSION_COMPRESSED_RPC;
+  public static final byte CURRENT_VERSION = VERSION_RPCOPTIONS;
 
   /**
    * How many calls/handler are allowed in the queue.
@@ -238,6 +239,9 @@ public abstract class HBaseServer {
     protected Compression.Algorithm compressionAlgo =
       Compression.Algorithm.NONE;
     protected int version = CURRENT_VERSION;     // version used for the call
+    
+    protected boolean shouldProfile = false;
+    protected ProfilingData profilingData = null;
 
     public Call(int id, Writable param, Connection connection) {
       this.id = id;
@@ -991,20 +995,14 @@ public abstract class HBaseServer {
       int id = uncompressedIs.readInt();
       if (LOG.isTraceEnabled())
         LOG.trace(" got #" + id);
-
-      if (version >= VERSION_COMPRESSED_RPC) {
-
-        // 2. read the compression used for the request
-        String rxCompressionAlgoName = uncompressedIs.readUTF();
-        rxCompression =
-          Compression.getCompressionAlgorithmByName(rxCompressionAlgoName);
-
-        // 3. read the compression requested for the response
-        String txCompressionAlgoName = uncompressedIs.readUTF();
-        txCompression =
-          Compression.getCompressionAlgorithmByName(txCompressionAlgoName);
-
-        // 4. set up a decompressor to read the rest of the request
+      
+      HBaseRPCOptions options = new HBaseRPCOptions ();
+      if (version >= VERSION_RPCOPTIONS) {
+        // 2. read rpc options uncompressed
+        options.readFields(dis);
+        rxCompression = options.getRPCCompression();
+        txCompression = options.getRPCCompression();
+        // 3. set up a decompressor to read the rest of the request
         if (rxCompression != Compression.Algorithm.NONE) {
           Decompressor decompressor = rxCompression.getDecompressor();
           InputStream is = rxCompression.createDecompressionStream(
@@ -1012,12 +1010,13 @@ public abstract class HBaseServer {
           dis = new DataInputStream(is);
         }
       }
-
-      // 5. read the rest of the params
+      // 4. read the rest of the params
       Writable param = ReflectionUtils.newInstance(paramClass, conf);
       param.readFields(dis);
 
       Call call = new Call(id, param, this);
+      call.shouldProfile = options.getRequestProfiling ();
+      
       call.setRPCCompression(txCompression);
       call.setVersion(version);
       callQueue.put(call);              // queue the call; maybe blocked here
@@ -1068,9 +1067,13 @@ public abstract class HBaseServer {
           String errorClass = null;
           String error = null;
           Writable value = null;
+          call.profilingData = new ProfilingData ();
+          HRegionServer.threadLocalProfilingData.set (call.profilingData);
+          
           CurCall.set(call);
           UserGroupInformation previous = UserGroupInformation.getCurrentUGI();
           UserGroupInformation.setCurrentUser(call.connection.ticket);
+          long start = System.currentTimeMillis ();
           try {
             // make the call
             value = call(call.param, call.timestamp, status);
@@ -1079,8 +1082,11 @@ public abstract class HBaseServer {
             errorClass = e.getClass().getName();
             error = StringUtils.stringifyException(e);
           }
+          long total = System.currentTimeMillis () - start;
+          call.profilingData.addLong("total_server_time.ms", total);
           UserGroupInformation.setCurrentUser(previous);
           CurCall.set(null);
+          HRegionServer.threadLocalProfilingData.remove ();
 
           int size = BUFFER_INITIAL_SIZE;
           if (value instanceof WritableWithSize) {
@@ -1106,11 +1112,11 @@ public abstract class HBaseServer {
 
           // 1. write call id uncompressed
           out.writeInt(call.id);
-
+          
           // 2. write error flag uncompressed
           out.writeBoolean(error != null);
-
-          if (call.getVersion() >= VERSION_COMPRESSED_RPC) {
+          
+          if (call.getVersion() >= VERSION_RPCOPTIONS) {
             // 3. write the compression type for the rest of the response
             out.writeUTF(call.getRPCCompression().getName());
 
@@ -1126,6 +1132,15 @@ public abstract class HBaseServer {
           // 5. write the output as per the compression
           if (error == null) {
             value.write(out);
+            // write profiling data if requested
+            if (call.getVersion () >= VERSION_RPCOPTIONS) {
+            	if (!call.shouldProfile) {
+            		out.writeBoolean(false);
+            	} else {
+            		out.writeBoolean(true);
+            		call.profilingData.write(out);
+            	}
+            }
           } else {
             WritableUtils.writeString(out, errorClass);
             WritableUtils.writeString(out, error);

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/ProfilingData.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/ProfilingData.java?rev=1356924&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/ProfilingData.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/ProfilingData.java Tue Jul  3 20:41:02 2012
@@ -0,0 +1,126 @@
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.WritableWithSize;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.BytesWritable;
+
+/*
+ * A map containing profiling data
+ * only maps String->String to be pretty printable
+ */
+
+public class ProfilingData implements Writable {
+
+	private MapWritable mapString = new MapWritable ();
+	private MapWritable mapLong = new MapWritable ();
+	private MapWritable mapInt = new MapWritable ();
+	private MapWritable mapBoolean = new MapWritable ();
+	private MapWritable mapFloat = new MapWritable ();
+
+	public ProfilingData () {}
+
+	public void addString (String key, String val) {
+		mapString.put (new BytesWritable (key.getBytes ()), 
+		    new BytesWritable (val.getBytes ()));
+	}
+
+	public String getString (String key) {
+		return new String (((BytesWritable) mapString.get 
+		    (new BytesWritable (key.getBytes ()))).get ());
+	}
+	
+	public void addLong (String key, long val) {
+    mapLong.put (new BytesWritable (key.getBytes ()), 
+        new LongWritable (val));
+  }
+
+  public long getLong (String key) {
+    return ((LongWritable) mapLong.get 
+        (new BytesWritable (key.getBytes ()))).get ();
+  }
+  
+  public void addInt (String key, int val) {
+    mapInt.put (new BytesWritable (key.getBytes ()), 
+        new IntWritable (val));
+  }
+
+  public int getInt (String key) {
+    return ((IntWritable) mapInt.get 
+        (new BytesWritable (key.getBytes ()))).get ();
+  }
+  
+  public void addBoolean (String key, boolean val) {
+    mapBoolean.put (new BytesWritable (key.getBytes ()), 
+        new BooleanWritable (val));
+  }
+
+  public boolean getBoolean (String key) {
+    return ((BooleanWritable) mapBoolean.get 
+        (new BytesWritable (key.getBytes ()))).get ();
+  }
+  
+  public void addFloat (String key, float val) {
+    mapFloat.put (new BytesWritable (key.getBytes ()), 
+        new FloatWritable (val));
+  }
+
+  public float getFloat (String key) {
+    return ((FloatWritable) mapFloat.get 
+        (new BytesWritable (key.getBytes ()))).get ();
+  }
+	
+	@Override
+	public void write(DataOutput out) throws IOException {
+	 	mapString.write (out);
+	 	mapBoolean.write (out);
+	 	mapInt.write (out);
+	 	mapLong.write (out);
+	 	mapFloat.write (out);
+	}
+	  
+	@Override
+	public void readFields(DataInput in) throws IOException {
+	  mapString.readFields (in);
+	  mapBoolean.readFields (in);
+	  mapInt.readFields (in);
+	  mapLong.readFields (in);
+	  mapFloat.readFields (in);
+	}
+	
+	@Override
+	public String toString () {
+	  StringBuilder sb = new StringBuilder ();
+	  for (Map.Entry<Writable,Writable> entry : mapString.entrySet ()) {
+	    sb.append (new String (((BytesWritable) entry.getKey ()).get ()) + " : " 
+	        + new String (((BytesWritable) entry.getValue ()).get ()) + "\n");
+	  }
+	  for (Map.Entry<Writable,Writable> entry : mapBoolean.entrySet ()) {
+      sb.append (new String (((BytesWritable) entry.getKey ()).get ()) + " : " 
+          + ((BooleanWritable) entry.getValue ()).get () + "\n");
+    }
+	  for (Map.Entry<Writable,Writable> entry : mapInt.entrySet ()) {
+      sb.append (new String (((BytesWritable) entry.getKey ()).get ()) + " : " 
+          + ((IntWritable) entry.getValue ()).get () + "\n");
+    }
+	  for (Map.Entry<Writable,Writable> entry : mapLong.entrySet ()) {
+      sb.append (new String (((BytesWritable) entry.getKey ()).get ()) + " : " 
+          + ((LongWritable) entry.getValue ()).get () + "\n");
+    }
+	  for (Map.Entry<Writable,Writable> entry : mapFloat.entrySet ()) {
+      sb.append (new String (((BytesWritable) entry.getKey ()).get ()) + " : " 
+          + ((FloatWritable) entry.getValue ()).get () + "\n");
+    }
+	  return sb.toString ();
+	}
+}

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1356924&r1=1356923&r2=1356924&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Tue Jul  3 20:41:02 2012
@@ -103,11 +103,13 @@ import org.apache.hadoop.hbase.io.hfile.
 import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
 import org.apache.hadoop.hbase.io.hfile.LruBlockCache.CacheStats;
 import org.apache.hadoop.hbase.ipc.HBaseRPC;
+import org.apache.hadoop.hbase.ipc.HBaseRPCOptions;
 import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
 import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;
 import org.apache.hadoop.hbase.ipc.HBaseServer;
 import org.apache.hadoop.hbase.ipc.HMasterRegionInterface;
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.ipc.ProfilingData;
 import org.apache.hadoop.hbase.master.AssignmentPlan;
 import org.apache.hadoop.hbase.master.RegionPlacement;
 import org.apache.hadoop.hbase.regionserver.metrics.RegionServerDynamicMetrics;
@@ -315,6 +317,9 @@ public class HRegionServer implements HR
 
   private String stopReason = "not stopping";
 
+  // profiling threadlocal
+  public static final ThreadLocal<ProfilingData> threadLocalProfilingData = new ThreadLocal<ProfilingData> ();
+
   /**
    * Starts a HRegionServer at the default location
    * @param conf
@@ -1353,6 +1358,7 @@ public class HRegionServer implements HR
    * by this hosting server.  Worker logs the exception and exits.
    */
   private void startServiceThreads() throws IOException {
+    HBaseRPC.startProxy();
     String n = Thread.currentThread().getName();
     UncaughtExceptionHandler handler = new UncaughtExceptionHandler() {
       @Override
@@ -1585,7 +1591,8 @@ public class HRegionServer implements HR
         // should retry indefinitely.
         master = (HMasterRegionInterface)HBaseRPC.getProxy(
           HMasterRegionInterface.class, HBaseRPCProtocolVersion.versionID,
-          masterAddress.getInetSocketAddress(), this.conf, this.rpcTimeout);
+          masterAddress.getInetSocketAddress(), this.conf, this.rpcTimeout, 
+          HBaseRPCOptions.DEFAULT);
       } catch (IOException e) {
         LOG.warn("Unable to connect to master. Retrying. Error was:", e);
         sleeper.sleep();

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java?rev=1356924&r1=1356923&r2=1356924&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java Tue Jul  3 20:41:02 2012
@@ -614,4 +614,20 @@ public class RemoteHTable implements HTa
 		throws IOException {
     throw new IOException("atomicMutation not supported");
   }
+
+  @Override
+  public void setProfiling(boolean prof) {}
+
+  @Override
+  public boolean getProfiling() {
+    return false;
+  }
+
+  @Override
+  public void setTag (String tag) {}
+
+  @Override
+  public String getTag () {
+    return null;
+  }
 }

Modified: hbase/branches/0.89-fb/src/main/ruby/hbase/table.rb
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/ruby/hbase/table.rb?rev=1356924&r1=1356923&r2=1356924&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/ruby/hbase/table.rb (original)
+++ hbase/branches/0.89-fb/src/main/ruby/hbase/table.rb Tue Jul  3 20:41:02 2012
@@ -43,7 +43,23 @@ module Hbase
     def initialize(configuration, table_name, formatter)
       @table = HTable.new(configuration, table_name)
     end
+    
+    #----------------------------------------------------------------------------------------------
+    # Set profiling on or off
+    def set_profiling(prof)
+      @table.setProfiling(prof)
+    end
 
+#----------------------------------------------------------------------------------------------
+    # Get profiling data
+    def get_profiling()
+      data = @table.getProfilingData()
+      if data == nil
+        return nil
+      else
+        return data.toString()
+      end
+    end
     #----------------------------------------------------------------------------------------------
     # Put a cell 'value' at specified table/row/column
     def put(row, column, value, timestamp = nil)

Modified: hbase/branches/0.89-fb/src/main/ruby/shell.rb
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/ruby/shell.rb?rev=1356924&r1=1356923&r2=1356924&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/ruby/shell.rb (original)
+++ hbase/branches/0.89-fb/src/main/ruby/shell.rb Tue Jul  3 20:41:02 2012
@@ -70,17 +70,32 @@ module Shell
     @debug = false
     attr_accessor :debug
 
+    @profiling = false
+    attr_accessor :profiling
+
+    @saved_table = nil  #used for retrieving profiling data
+
     def initialize(hbase, formatter)
       self.hbase = hbase
       self.formatter = formatter
     end
 
+    def get_profiling()
+      if @saved_table == nil
+        return nil
+      else
+        return @saved_table.get_profiling()
+      end
+    end
+
     def hbase_admin
       @hbase_admin ||= hbase.admin(formatter)
     end
 
     def hbase_table(name)
-      hbase.table(name, formatter)
+      @saved_table = hbase.table(name, formatter)
+      @saved_table.set_profiling(@profiling)
+      return @saved_table
     end
 
     def export_commands(where)
@@ -252,6 +267,8 @@ Shell.load_command_group(
     flush
     major_compact
     move_region
+    set_profiling
+    get_profiling
     shutdown
     split
     zk

Added: hbase/branches/0.89-fb/src/main/ruby/shell/commands/get_profiling.rb
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/ruby/shell/commands/get_profiling.rb?rev=1356924&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/ruby/shell/commands/get_profiling.rb (added)
+++ hbase/branches/0.89-fb/src/main/ruby/shell/commands/get_profiling.rb Tue Jul  3 20:41:02 2012
@@ -0,0 +1,46 @@
+#
+# Copyright 2010 The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+module Shell
+  module Commands
+    class GetProfiling < Command
+      def help
+        return <<-EOF
+          Print profiling data. Profiling data request can 
+          be set using set_profiling. Examples:
+
+            hbase> set_profiling 'on'
+            hbase> get 't1', 'r1'
+            hbase> get_profiling
+            hbase> set_profiling 'off'
+        EOF
+      end
+
+      def command()
+        if shell.get_profiling() == nil
+          puts "No profiling data."
+        else
+          puts "Profiling data:"
+          puts shell.get_profiling()
+        end
+      end
+    end
+  end
+end

Added: hbase/branches/0.89-fb/src/main/ruby/shell/commands/set_profiling.rb
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/ruby/shell/commands/set_profiling.rb?rev=1356924&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/ruby/shell/commands/set_profiling.rb (added)
+++ hbase/branches/0.89-fb/src/main/ruby/shell/commands/set_profiling.rb Tue Jul  3 20:41:02 2012
@@ -0,0 +1,49 @@
+#
+# Copyright 2010 The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+module Shell
+  module Commands
+    class SetProfiling < Command
+      def help
+        return <<-EOF
+          Set profiling on or off. Profiling data can be printed
+          using get_profiling after an RPC call. Examples:
+
+            hbase> set_profiling 'on'
+            hbase> get 't1', 'r1'
+            hbase> get_profiling
+            hbase> set_profiling 'off'
+        EOF
+      end
+
+      def command(prof)
+        if prof == 'on'
+          shell.profiling = true
+          puts "Profiling is on"
+        elsif prof == 'off'
+          shell.profiling = false
+          puts "Profiling is off"
+        else
+          puts "Command ignored"
+        end
+      end
+    end
+  end
+end

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java?rev=1356924&r1=1356923&r2=1356924&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java Tue Jul  3 20:41:02 2012
@@ -62,6 +62,10 @@ public class MiniHBaseCluster {
 
   static long PREFERRED_ASSIGNMENT = 1000L;
   static long WAIT_FOR_LOADBALANCER = 2000L;
+  
+  // need reference count to prevent the first cluster
+  // from shutting things the other still needs
+  static int numClusters = 0;
   /**
    * Start a MiniHBaseCluster.
    * @param conf Configuration to be used for cluster
@@ -84,6 +88,7 @@ public class MiniHBaseCluster {
       int numRegionServers)
   throws IOException, InterruptedException {
     this.conf = conf;
+    MiniHBaseCluster.numClusters ++;
     conf.set(HConstants.MASTER_PORT, "0");
     conf.setLong("hbase.master.applyPreferredAssignment.period",
         PREFERRED_ASSIGNMENT);
@@ -371,7 +376,10 @@ public class MiniHBaseCluster {
     if (this.hbaseCluster != null) {
       this.hbaseCluster.shutdown();
     }
-    HConnectionManager.deleteAllConnections(false);
+    MiniHBaseCluster.numClusters --;
+    if (MiniHBaseCluster.numClusters <= 0) {
+      HConnectionManager.deleteAllConnections();
+    }
   }
 
   /**



Mime
View raw message