hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r1459013 [7/8] - in /hbase/trunk: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ hbase-client/src/test/java/org...
Date Wed, 20 Mar 2013 19:36:47 GMT
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java?rev=1459013&r1=1459012&r2=1459013&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java Wed Mar 20 19:36:46 2013
@@ -19,16 +19,19 @@
 
 package org.apache.hadoop.hbase.ipc;
 
-import com.google.common.base.Function;
-import com.google.protobuf.Message;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.IpcProtocol;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.hadoop.hbase.util.Pair;
 
-import java.io.IOException;
-import java.net.InetSocketAddress;
+import com.google.common.base.Function;
+import com.google.protobuf.Message;
 
 @InterfaceAudience.Private
 public interface RpcServer {
@@ -47,19 +50,19 @@ public interface RpcServer {
   InetSocketAddress getListenerAddress();
 
   /** Called for each call.
+   * @param method Method to invoke.
    * @param param parameter
    * @param receiveTime time
-   * @return Message Protobuf response Message
+   * @param status
+   * @return Message Protobuf response Message and optionally the Cells that make up the response.
    * @throws java.io.IOException e
    */
-  Message call(Class<? extends IpcProtocol> protocol,
-      RpcRequestBody param, long receiveTime, MonitoredRPCHandler status)
-      throws IOException;
+  Pair<Message, CellScanner> call(Class<? extends IpcProtocol> protocol, Method method,
+    Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
+  throws IOException;
 
   void setErrorHandler(HBaseRPCErrorHandler handler);
 
-  void setQosFunction(Function<RpcRequestBody, Integer> newFunc);
-
   void openServer();
 
   void startThreads();
@@ -68,4 +71,6 @@ public interface RpcServer {
    * Returns the metrics instance for reporting RPC call statistics
    */
   MetricsHBaseServer getMetrics();
+
+  public void setQosFunction(Function<Pair<RequestHeader, Message>, Integer> newFunc);
 }
\ No newline at end of file

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSerialization.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSerialization.java?rev=1459013&r1=1459012&r2=1459013&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSerialization.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSerialization.java Wed Mar 20 19:36:46 2013
@@ -17,13 +17,13 @@
  */
 package org.apache.hadoop.hbase.mapreduce;
 
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
 import org.apache.hadoop.io.serializer.Deserializer;
 import org.apache.hadoop.io.serializer.Serialization;
 import org.apache.hadoop.io.serializer.Serializer;
@@ -45,43 +45,41 @@ public class KeyValueSerialization imple
   }
 
   public static class KeyValueDeserializer implements Deserializer<KeyValue> {
-    private InputStream is;
+    private DataInputStream dis;
 
     @Override
     public void close() throws IOException {
-      this.is.close();
+      this.dis.close();
     }
 
     @Override
     public KeyValue deserialize(KeyValue ignore) throws IOException {
       // I can't overwrite the passed in KV, not from a proto kv, not just yet.  TODO
-      HBaseProtos.KeyValue proto =
-        HBaseProtos.KeyValue.parseDelimitedFrom(this.is);
-      return ProtobufUtil.toKeyValue(proto);
+      return KeyValue.create(this.dis);
     }
 
     @Override
     public void open(InputStream is) throws IOException {
-      this.is = is;
+      this.dis = new DataInputStream(is);
     }
   }
 
   public static class KeyValueSerializer implements Serializer<KeyValue> {
-    private OutputStream os;
+    private DataOutputStream dos;
 
     @Override
     public void close() throws IOException {
-      this.os.close();
+      this.dos.close();
     }
 
     @Override
     public void open(OutputStream os) throws IOException {
-      this.os = os;
+      this.dos = new DataOutputStream(os);
     }
 
     @Override
     public void serialize(KeyValue kv) throws IOException {
-      ProtobufUtil.toKeyValue(kv).writeDelimitedTo(this.os);
+      KeyValue.write(kv, this.dos);
     }
   }
 }
\ No newline at end of file

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MutationSerialization.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MutationSerialization.java?rev=1459013&r1=1459012&r2=1459013&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MutationSerialization.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MutationSerialization.java Wed Mar 20 19:36:46 2013
@@ -25,8 +25,8 @@ import org.apache.hadoop.hbase.client.De
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.MutateType;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
 import org.apache.hadoop.io.serializer.Deserializer;
 import org.apache.hadoop.io.serializer.Serialization;
 import org.apache.hadoop.io.serializer.Serializer;
@@ -57,7 +57,7 @@ public class MutationSerialization imple
 
     @Override
     public Mutation deserialize(Mutation mutation) throws IOException {
-      Mutate proto = Mutate.parseDelimitedFrom(in);
+      MutationProto proto = MutationProto.parseDelimitedFrom(in);
       return ProtobufUtil.toMutation(proto);
     }
 
@@ -82,15 +82,15 @@ public class MutationSerialization imple
 
     @Override
     public void serialize(Mutation mutation) throws IOException {
-      MutateType type;
+      MutationType type;
       if (mutation instanceof Put) {
-        type = MutateType.PUT;
+        type = MutationType.PUT;
       } else if (mutation instanceof Delete) {
-        type = MutateType.DELETE;
+        type = MutationType.DELETE;
       } else {
         throw new IllegalArgumentException("Only Put and Delete are supported");
       }
-      ProtobufUtil.toMutate(type, mutation).writeDelimitedTo(out);
+      ProtobufUtil.toMutation(type, mutation).writeDelimitedTo(out);
     }
   }
 }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandler.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandler.java?rev=1459013&r1=1459012&r2=1459013&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandler.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandler.java Wed Mar 20 19:36:46 2013
@@ -19,7 +19,8 @@
 package org.apache.hadoop.hbase.monitoring;
 
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
+
+import com.google.protobuf.Message;
 
 /**
  * A MonitoredTask implementation optimized for use with RPC Handlers 
@@ -37,8 +38,7 @@ public interface MonitoredRPCHandler ext
   public abstract boolean isRPCRunning();
   public abstract boolean isOperationRunning();
 
-  public abstract void setRPC(String methodName, Object [] params,
-      long queueTime);
-  public abstract void setRPCPacket(RpcRequestBody param);
+  public abstract void setRPC(String methodName, Object [] params, long queueTime);
+  public abstract void setRPCPacket(Message param);
   public abstract void setConnection(String clientAddress, int remotePort);
 }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java?rev=1459013&r1=1459012&r2=1459013&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java Wed Mar 20 19:36:46 2013
@@ -18,19 +18,15 @@
  */
 package org.apache.hadoop.hbase.monitoring;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Operation;
-import org.apache.hadoop.hbase.io.WritableWithSize;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.Writable;
 
-import org.codehaus.jackson.map.ObjectMapper;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
+import com.google.protobuf.Message;
 
 /**
  * A MonitoredTask implementation designed for use with RPC Handlers 
@@ -46,7 +42,7 @@ public class MonitoredRPCHandlerImpl ext
   private long rpcStartTime;
   private String methodName = "";
   private Object [] params = {};
-  private RpcRequestBody packet;
+  private Message packet;
 
   public MonitoredRPCHandlerImpl() {
     super();
@@ -201,7 +197,7 @@ public class MonitoredRPCHandlerImpl ext
    * that it can later compute its size if asked for it.
    * @param param The protobuf received by the RPC for this call
    */
-  public void setRPCPacket(RpcRequestBody param) {
+  public void setRPCPacket(Message param) {
     this.packet = param;
   }
 
@@ -257,4 +253,4 @@ public class MonitoredRPCHandlerImpl ext
     }
     return super.toString() + ", rpcMethod=" + getRPC();
   }
-}
+}
\ No newline at end of file

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java?rev=1459013&r1=1459012&r2=1459013&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java Wed Mar 20 19:36:46 2013
@@ -27,6 +27,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hbase.errorhandling.ForeignException;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.zookeeper.KeeperException;
@@ -205,8 +206,9 @@ public class ZKProcedureMemberRpcs imple
       byte[] data = ZKUtil.getData(zkController.getWatcher(), path);
       LOG.debug("start proc data length is " + data.length);
       if (!ProtobufUtil.isPBMagicPrefix(data)) {
-        String msg = "Data in for starting procuedure " + opName + " is illegally formatted. "
-            + "Killing the procedure.";
+        String msg = "Data in for starting procuedure " + opName +
+          " is illegally formatted (no pb magic). " +
+          "Killing the procedure: " + Bytes.toString(data);
         LOG.error(msg);
         throw new IllegalArgumentException(msg);
       }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1459013&r1=1459012&r2=1459013&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Wed Mar 20 19:36:46 2013
@@ -25,7 +25,6 @@ import java.lang.annotation.RetentionPol
 import java.lang.management.ManagementFactory;
 import java.lang.management.MemoryUsage;
 import java.lang.reflect.Constructor;
-import java.lang.reflect.Method;
 import java.net.BindException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
@@ -58,14 +57,17 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Chore;
 import org.apache.hadoop.hbase.exceptions.ClockOutOfSyncException;
-import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException;
 import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
+import org.apache.hadoop.hbase.CellScannable;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.HealthCheckChore;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException;
 import org.apache.hadoop.hbase.exceptions.LeaseException;
 import org.apache.hadoop.hbase.exceptions.NoSuchColumnFamilyException;
 import org.apache.hadoop.hbase.exceptions.NotServingRegionException;
@@ -107,6 +109,7 @@ import org.apache.hadoop.hbase.io.hfile.
 import org.apache.hadoop.hbase.ipc.HBaseClientRPC;
 import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
 import org.apache.hadoop.hbase.ipc.HBaseServerRPC;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
 import org.apache.hadoop.hbase.ipc.ProtobufRpcClientEngine;
 import org.apache.hadoop.hbase.ipc.RpcClientEngine;
 import org.apache.hadoop.hbase.ipc.RpcServer;
@@ -156,10 +159,10 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiGetResponse;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.MutateType;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
@@ -168,7 +171,6 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionLoad;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
@@ -212,11 +214,11 @@ import org.apache.hadoop.util.StringUtil
 import org.apache.zookeeper.KeeperException;
 import org.cliffc.high_scale_lib.Counter;
 
-import com.google.common.base.Function;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.Message;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
+import com.google.protobuf.TextFormat;
 
 /**
  * HRegionServer makes a set of HRegions available to clients. It checks in with
@@ -458,8 +460,7 @@ public class HRegionServer implements Cl
 
     // Config'ed params
     this.numRetries = conf.getInt("hbase.client.retries.number", 10);
-    this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY,
-      10 * 1000);
+    this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
     this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);
 
     this.sleeper = new Sleeper(this.msgInterval, this);
@@ -507,7 +508,7 @@ public class HRegionServer implements Cl
     this.isa = this.rpcServer.getListenerAddress();
 
     this.rpcServer.setErrorHandler(this);
-    this.rpcServer.setQosFunction((qosFunction = new QosFunction()));
+    this.rpcServer.setQosFunction((qosFunction = new QosFunction(this)));
     this.startcode = System.currentTimeMillis();
 
     // login the zookeeper client principal (if using security)
@@ -567,152 +568,6 @@ public class HRegionServer implements Cl
   }
 
   /**
-   * Utility used ensuring higher quality of service for priority rpcs; e.g.
-   * rpcs to .META., etc.
-   */
-  class QosFunction implements Function<RpcRequestBody,Integer> {
-    private final Map<String, Integer> annotatedQos;
-    //We need to mock the regionserver instance for some unit tests (set via
-    //setRegionServer method.
-    //The field value is initially set to the enclosing instance of HRegionServer.
-    private HRegionServer hRegionServer = HRegionServer.this;
-
-    //The logic for figuring out high priority RPCs is as follows:
-    //1. if the method is annotated with a QosPriority of QOS_HIGH,
-    //   that is honored
-    //2. parse out the protobuf message and see if the request is for meta
-    //   region, and if so, treat it as a high priority RPC
-    //Some optimizations for (2) are done here -
-    //Clients send the argument classname as part of making the RPC. The server
-    //decides whether to deserialize the proto argument message based on the
-    //pre-established set of argument classes (knownArgumentClasses below).
-    //This prevents the server from having to deserialize all proto argument
-    //messages prematurely.
-    //All the argument classes declare a 'getRegion' method that returns a
-    //RegionSpecifier object. Methods can be invoked on the returned object
-    //to figure out whether it is a meta region or not.
-    @SuppressWarnings("unchecked")
-    private final Class<? extends Message>[] knownArgumentClasses = new Class[]{
-        GetRegionInfoRequest.class,
-        GetStoreFileRequest.class,
-        CloseRegionRequest.class,
-        FlushRegionRequest.class,
-        SplitRegionRequest.class,
-        CompactRegionRequest.class,
-        GetRequest.class,
-        MutateRequest.class,
-        ScanRequest.class,
-        MultiRequest.class
-    };
-
-    //Some caches for helping performance
-    private final Map<String, Class<? extends Message>> argumentToClassMap =
-        new HashMap<String, Class<? extends Message>>();
-    private final Map<String, Map<Class<? extends Message>, Method>>
-      methodMap = new HashMap<String, Map<Class<? extends Message>, Method>>();
-
-    public QosFunction() {
-      Map<String, Integer> qosMap = new HashMap<String, Integer>();
-      for (Method m : HRegionServer.class.getMethods()) {
-        QosPriority p = m.getAnnotation(QosPriority.class);
-        if (p != null) {
-          qosMap.put(m.getName(), p.priority());
-        }
-      }
-
-      annotatedQos = qosMap;
-      if (methodMap.get("parseFrom") == null) {
-        methodMap.put("parseFrom",
-            new HashMap<Class<? extends Message>, Method>());
-      }
-      if (methodMap.get("getRegion") == null) {
-        methodMap.put("getRegion",
-            new HashMap<Class<? extends Message>, Method>());
-      }
-      for (Class<? extends Message> cls : knownArgumentClasses) {
-        argumentToClassMap.put(cls.getCanonicalName(), cls);
-        try {
-          methodMap.get("parseFrom").put(cls,
-                          cls.getDeclaredMethod("parseFrom",ByteString.class));
-          methodMap.get("getRegion").put(cls, cls.getDeclaredMethod("getRegion"));
-        } catch (Exception e) {
-          throw new RuntimeException(e);
-        }
-      }
-    }
-
-    void setRegionServer(HRegionServer server) {
-      this.hRegionServer = server;
-    }
-
-    public boolean isMetaRegion(byte[] regionName) {
-      HRegion region;
-      try {
-        region = hRegionServer.getRegion(regionName);
-      } catch (NotServingRegionException ignored) {
-        return false;
-      }
-      return region.getRegionInfo().isMetaRegion();
-    }
-
-    @Override
-    public Integer apply(RpcRequestBody from) {
-      String methodName = from.getMethodName();
-      Class<? extends Message> rpcArgClass = null;
-      if (from.hasRequestClassName()) {
-        String cls = from.getRequestClassName();
-        rpcArgClass = argumentToClassMap.get(cls);
-      }
-
-      Integer priorityByAnnotation = annotatedQos.get(methodName);
-      if (priorityByAnnotation != null) {
-        return priorityByAnnotation;
-      }
-
-      if (rpcArgClass == null || from.getRequest().isEmpty()) {
-        return HConstants.NORMAL_QOS;
-      }
-      Object deserializedRequestObj;
-      //check whether the request has reference to Meta region
-      try {
-        Method parseFrom = methodMap.get("parseFrom").get(rpcArgClass);
-        deserializedRequestObj = parseFrom.invoke(null, from.getRequest());
-        Method getRegion = methodMap.get("getRegion").get(rpcArgClass);
-        RegionSpecifier regionSpecifier =
-            (RegionSpecifier)getRegion.invoke(deserializedRequestObj,
-                (Object[])null);
-        HRegion region = hRegionServer.getRegion(regionSpecifier);
-        if (region.getRegionInfo().isMetaTable()) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("High priority: " + from.toString());
-          }
-          return HConstants.HIGH_QOS;
-        }
-      } catch (Exception ex) {
-        throw new RuntimeException(ex);
-      }
-
-      if (methodName.equals("scan")) { // scanner methods...
-        ScanRequest request = (ScanRequest)deserializedRequestObj;
-        if (!request.hasScannerId()) {
-          return HConstants.NORMAL_QOS;
-        }
-        RegionScanner scanner = hRegionServer.getScanner(request.getScannerId());
-        if (scanner != null && scanner.getRegionInfo().isMetaTable()) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("High priority scanner request: " + request.getScannerId());
-          }
-          return HConstants.HIGH_QOS;
-        }
-      }
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Low priority: " + from.toString());
-      }
-      return HConstants.NORMAL_QOS;
-    }
-  }
-
-  /**
    * All initialization needed before we go register with Master.
    *
    * @throws IOException
@@ -1448,8 +1303,8 @@ public class HRegionServer implements Cl
       Path logdir = new Path(rootDir, logName);
       if (LOG.isDebugEnabled()) LOG.debug("logdir=" + logdir);
 
-      this.hlogForMeta = HLogFactory.createMetaHLog(this.fs.getBackingFs(), 
-          rootDir, logName, this.conf, getMetaWALActionListeners(), 
+      this.hlogForMeta = HLogFactory.createMetaHLog(this.fs.getBackingFs(),
+          rootDir, logName, this.conf, getMetaWALActionListeners(),
           this.serverNameFromMasterPOV.toString());
     }
     return this.hlogForMeta;
@@ -1551,7 +1406,7 @@ public class HRegionServer implements Cl
       ".compactionChecker", uncaughtExceptionHandler);
     if (this.healthCheckChore != null) {
     Threads
-        .setDaemonThreadRunning(this.healthCheckChore.getThread(), n + ".healthChecker", 
+        .setDaemonThreadRunning(this.healthCheckChore.getThread(), n + ".healthChecker",
             uncaughtExceptionHandler);
     }
 
@@ -1645,17 +1500,17 @@ public class HRegionServer implements Cl
       return getWAL(null);
     } catch (IOException e) {
       LOG.warn("getWAL threw exception " + e);
-      return null; 
+      return null;
     }
   }
 
   @Override
   public HLog getWAL(HRegionInfo regionInfo) throws IOException {
     //TODO: at some point this should delegate to the HLogFactory
-    //currently, we don't care about the region as much as we care about the 
+    //currently, we don't care about the region as much as we care about the
     //table.. (hence checking the tablename below)
-    //_ROOT_ and .META. regions have separate WAL. 
-    if (regionInfo != null && 
+    //_ROOT_ and .META. regions have separate WAL.
+    if (regionInfo != null &&
         regionInfo.isMetaTable()) {
       return getMetaWAL();
     }
@@ -1749,15 +1604,15 @@ public class HRegionServer implements Cl
       if (cause != null) {
         msg += "\nCause:\n" + StringUtils.stringifyException(cause);
       }
-      if (hbaseMaster != null) {
+      // Report to the master but only if we have already registered with the master.
+      if (hbaseMaster != null && this.serverNameFromMasterPOV != null) {
         ReportRSFatalErrorRequest.Builder builder =
           ReportRSFatalErrorRequest.newBuilder();
         ServerName sn =
           ServerName.parseVersionedServerName(this.serverNameFromMasterPOV.getVersionedBytes());
         builder.setServer(ProtobufUtil.toServerName(sn));
         builder.setErrorMessage(msg);
-        hbaseMaster.reportRSFatalError(
-          null,builder.build());
+        hbaseMaster.reportRSFatalError(null, builder.build());
       }
     } catch (Throwable t) {
       LOG.warn("Unable to report fatal error to master", t);
@@ -2805,33 +2660,39 @@ public class HRegionServer implements Cl
   /**
    * Mutate data in a table.
    *
-   * @param controller the RPC controller
+   * @param rpcc the RPC controller
    * @param request the mutate request
    * @throws ServiceException
    */
   @Override
-  public MutateResponse mutate(final RpcController controller,
+  public MutateResponse mutate(final RpcController rpcc,
       final MutateRequest request) throws ServiceException {
+    // rpc controller is how we bring in data via the back door;  it is unprotobuf'ed data.
+    // It is also the conduit via which we pass back data.
+    PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
+    CellScanner cellScanner = controller != null? controller.cellScanner(): null;
+    // Clear scanner so we are not holding on to reference across call.
+    controller.setCellScanner(null);
     try {
       requestCount.increment();
       HRegion region = getRegion(request.getRegion());
       MutateResponse.Builder builder = MutateResponse.newBuilder();
-      Mutate mutate = request.getMutate();
+      MutationProto mutation = request.getMutation();
       if (!region.getRegionInfo().isMetaTable()) {
         cacheFlusher.reclaimMemStoreMemory();
       }
       Result r = null;
       Boolean processed = null;
-      MutateType type = mutate.getMutateType();
+      MutationType type = mutation.getMutateType();
       switch (type) {
       case APPEND:
-        r = append(region, mutate);
+        r = append(region, mutation, cellScanner);
         break;
       case INCREMENT:
-        r = increment(region, mutate);
+        r = increment(region, mutation, cellScanner);
         break;
       case PUT:
-        Put put = ProtobufUtil.toPut(mutate);
+        Put put = ProtobufUtil.toPut(mutation, cellScanner);
         if (request.hasCondition()) {
           Condition condition = request.getCondition();
           byte[] row = condition.getRow().toByteArray();
@@ -2859,7 +2720,7 @@ public class HRegionServer implements Cl
         }
         break;
       case DELETE:
-        Delete delete = ProtobufUtil.toDelete(mutate);
+        Delete delete = ProtobufUtil.toDelete(mutation, cellScanner);
         if (request.hasCondition()) {
           Condition condition = request.getCondition();
           byte[] row = condition.getRow().toByteArray();
@@ -2890,10 +2751,15 @@ public class HRegionServer implements Cl
           throw new DoNotRetryIOException(
             "Unsupported mutate type: " + type.name());
       }
+      CellScannable cellsToReturn = null;
       if (processed != null) {
         builder.setProcessed(processed.booleanValue());
       } else if (r != null) {
-        builder.setResult(ProtobufUtil.toResult(r));
+        builder.setResult(ProtobufUtil.toResultNoData(r));
+        cellsToReturn = r;
+      }
+      if (cellsToReturn != null) {
+        controller.setCellScanner(cellsToReturn.cellScanner());
       }
       return builder.build();
     } catch (IOException ie) {
@@ -3006,7 +2872,8 @@ public class HRegionServer implements Cl
             if (rsh != null) {
               if (request.getNextCallSeq() != rsh.nextCallSeq) {
                 throw new OutOfOrderScannerNextException("Expected nextCallSeq: " + rsh.nextCallSeq
-                    + " But the nextCallSeq got from client: " + request.getNextCallSeq());
+                  + " But the nextCallSeq got from client: " + request.getNextCallSeq() +
+                  "; request=" + TextFormat.shortDebugString(request));
               }
               // Increment the nextCallSeq value which is the next expected from client.
               rsh.nextCallSeq++;
@@ -3208,47 +3075,61 @@ public class HRegionServer implements Cl
   /**
    * Execute multiple actions on a table: get, mutate, and/or execCoprocessor
    *
-   * @param controller the RPC controller
+   * @param rpcc the RPC controller
    * @param request the multi request
    * @throws ServiceException
    */
   @Override
-  public MultiResponse multi(final RpcController controller,
-      final MultiRequest request) throws ServiceException {
+  public MultiResponse multi(final RpcController rpcc, final MultiRequest request)
+  throws ServiceException {
+    // rpc controller is how we bring in data via the back door;  it is unprotobuf'ed data.
+    // It is also the conduit via which we pass back data.
+    PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
+    CellScanner cellScanner = controller != null? controller.cellScanner(): null;
+    // Clear scanner so we are not holding on to reference across call.
+    controller.setCellScanner(null);
+    List<CellScannable> cellsToReturn = null;
     try {
       HRegion region = getRegion(request.getRegion());
       MultiResponse.Builder builder = MultiResponse.newBuilder();
+      List<MutationProto> mutations = new ArrayList<MutationProto>(request.getActionCount());
+      // Do a bunch of mutations atomically.  Mutations are Puts and Deletes.  NOT Gets.
       if (request.hasAtomic() && request.getAtomic()) {
-        List<Mutate> mutates = new ArrayList<Mutate>();
+        // MultiAction is union type.  Has a Get or a Mutate.
         for (ClientProtos.MultiAction actionUnion : request.getActionList()) {
-          if (actionUnion.hasMutate()) {
-            mutates.add(actionUnion.getMutate());
+          if (actionUnion.hasMutation()) {
+            mutations.add(actionUnion.getMutation());
           } else {
-            throw new DoNotRetryIOException(
-              "Unsupported atomic action type: " + actionUnion);
+            throw new DoNotRetryIOException("Unsupported atomic action type: " + actionUnion);
           }
         }
-        mutateRows(region, mutates);
+        // TODO: We are not updating a metric here.  Should we up requestCount?
+        if (!mutations.isEmpty()) mutateRows(region, mutations, cellScanner);
       } else {
+        // Do a bunch of Actions.
         ActionResult.Builder resultBuilder = null;
-        List<Mutate> mutates = new ArrayList<Mutate>();
+        cellsToReturn = new ArrayList<CellScannable>(request.getActionCount());
         for (ClientProtos.MultiAction actionUnion : request.getActionList()) {
-          requestCount.increment();
+          this.requestCount.increment();
+          ClientProtos.Result result = null;
           try {
-            ClientProtos.Result result = null;
             if (actionUnion.hasGet()) {
               Get get = ProtobufUtil.toGet(actionUnion.getGet());
               Result r = region.get(get);
               if (r != null) {
-                result = ProtobufUtil.toResult(r);
+                // Get a result with no data.  The data will be carried alongside pbs, not as pbs.
+                result = ProtobufUtil.toResultNoData(r);
+                // Add the Result to controller so it gets serialized apart from pb.  Get
+                // Results could be big so good if they are not serialized as pb.
+                cellsToReturn.add(r);
               }
-            } else if (actionUnion.hasMutate()) {
-              Mutate mutate = actionUnion.getMutate();
-              MutateType type = mutate.getMutateType();
-              if (type != MutateType.PUT && type != MutateType.DELETE) {
-                if (!mutates.isEmpty()) {
-                  doBatchOp(builder, region, mutates);
-                  mutates.clear();
+            } else if (actionUnion.hasMutation()) {
+              MutationProto mutation = actionUnion.getMutation();
+              MutationType type = mutation.getMutateType();
+              if (type != MutationType.PUT && type != MutationType.DELETE) {
+                if (!mutations.isEmpty()) {
+                  doBatchOp(builder, region, mutations, cellScanner);
+                  mutations.clear();
                 } else if (!region.getRegionInfo().isMetaTable()) {
                   cacheFlusher.reclaimMemStoreMemory();
                 }
@@ -3256,22 +3137,23 @@ public class HRegionServer implements Cl
               Result r = null;
               switch (type) {
               case APPEND:
-                r = append(region, mutate);
+                r = append(region, mutation, cellScanner);
                 break;
               case INCREMENT:
-                r = increment(region, mutate);
+                r = increment(region, mutation, cellScanner);
                 break;
               case PUT:
-                mutates.add(mutate);
-                break;
               case DELETE:
-                mutates.add(mutate);
+                mutations.add(mutation);
                 break;
               default:
                 throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
               }
               if (r != null) {
-                result = ProtobufUtil.toResult(r);
+                // Put the data into the cellsToReturn and the metadata about the result is all that
+                // we will pass back in the protobuf result.
+                result = ProtobufUtil.toResultNoData(r);
+                cellsToReturn.add(r);
               }
             } else {
               LOG.warn("Error: invalid action: " + actionUnion + ". "
@@ -3292,10 +3174,14 @@ public class HRegionServer implements Cl
             builder.addResult(ResponseConverter.buildActionResult(ie));
           }
         }
-        if (!mutates.isEmpty()) {
-          doBatchOp(builder, region, mutates);
+        if (!mutations.isEmpty()) {
+          doBatchOp(builder, region, mutations, cellScanner);
         }
       }
+      // Load the controller with the Cells to return.
+      if (cellsToReturn != null && !cellsToReturn.isEmpty()) {
+        controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn));
+      }
       return builder.build();
     } catch (IOException ie) {
       throw new ServiceException(ie);
@@ -3758,15 +3644,16 @@ public class HRegionServer implements Cl
    * Execute an append mutation.
    *
    * @param region
-   * @param mutate
+   * @param m
+   * @param cellScanner
    * @return result to return to client if default operation should be
    * bypassed as indicated by RegionObserver, null otherwise
    * @throws IOException
    */
   protected Result append(final HRegion region,
-      final Mutate mutate) throws IOException {
+      final MutationProto m, final CellScanner cellScanner) throws IOException {
     long before = EnvironmentEdgeManager.currentTimeMillis();
-    Append append = ProtobufUtil.toAppend(mutate);
+    Append append = ProtobufUtil.toAppend(m, cellScanner);
     Result r = null;
     if (region.getCoprocessorHost() != null) {
       r = region.getCoprocessorHost().preAppend(append);
@@ -3785,14 +3672,15 @@ public class HRegionServer implements Cl
    * Execute an increment mutation.
    *
    * @param region
-   * @param mutate
+   * @param mutation
    * @return the Result
    * @throws IOException
    */
-  protected Result increment(final HRegion region,
-      final Mutate mutate) throws IOException {
+  protected Result increment(final HRegion region, final MutationProto mutation,
+      final CellScanner cells)
+  throws IOException {
     long before = EnvironmentEdgeManager.currentTimeMillis();
-    Increment increment = ProtobufUtil.toIncrement(mutate);
+    Increment increment = ProtobufUtil.toIncrement(mutation, cells);
     Result r = null;
     if (region.getCoprocessorHost() != null) {
       r = region.getCoprocessorHost().preIncrement(increment);
@@ -3812,12 +3700,12 @@ public class HRegionServer implements Cl
    *
    * @param builder
    * @param region
-   * @param mutates
+   * @param mutations
    */
-  protected void doBatchOp(final MultiResponse.Builder builder,
-      final HRegion region, final List<Mutate> mutates) {
+  protected void doBatchOp(final MultiResponse.Builder builder, final HRegion region,
+      final List<MutationProto> mutations, final CellScanner cells) {
     @SuppressWarnings("unchecked")
-    Pair<Mutation, Integer>[] mutationsWithLocks = new Pair[mutates.size()];
+    Pair<Mutation, Integer>[] mutationsWithLocks = new Pair[mutations.size()];
     long before = EnvironmentEdgeManager.currentTimeMillis();
     boolean batchContainsPuts = false, batchContainsDelete = false;
     try {
@@ -3825,21 +3713,20 @@ public class HRegionServer implements Cl
       resultBuilder.setValue(ClientProtos.Result.newBuilder().build());
       ActionResult result = resultBuilder.build();
       int i = 0;
-      for (Mutate m : mutates) {
+      for (MutationProto m : mutations) {
         Mutation mutation;
-        if (m.getMutateType() == MutateType.PUT) {
-          mutation = ProtobufUtil.toPut(m);
+        if (m.getMutateType() == MutationType.PUT) {
+          mutation = ProtobufUtil.toPut(m, cells);
           batchContainsPuts = true;
         } else {
-          mutation = ProtobufUtil.toDelete(m);
+          mutation = ProtobufUtil.toDelete(m, cells);
           batchContainsDelete = true;
         }
         mutationsWithLocks[i++] = new Pair<Mutation, Integer>(mutation, null);
         builder.addResult(result);
       }
 
-
-      requestCount.add(mutates.size());
+      requestCount.add(mutations.size());
       if (!region.getRegionInfo().isMetaTable()) {
         cacheFlusher.reclaimMemStoreMemory();
       }
@@ -3871,7 +3758,7 @@ public class HRegionServer implements Cl
       }
     } catch (IOException ie) {
       ActionResult result = ResponseConverter.buildActionResult(ie);
-      for (int i = 0, n = mutates.size(); i < n; i++) {
+      for (int i = 0; i < mutations.size(); i++) {
         builder.setResult(i, result);
       }
     }
@@ -3888,25 +3775,27 @@ public class HRegionServer implements Cl
    * Mutate a list of rows atomically.
    *
    * @param region
-   * @param mutates
+   * @param mutations
+ * @param cellScanner if non-null, the mutation data -- the Cell content.
    * @throws IOException
    */
-  protected void mutateRows(final HRegion region,
-      final List<Mutate> mutates) throws IOException {
-    Mutate firstMutate = mutates.get(0);
+  protected void mutateRows(final HRegion region, final List<MutationProto> mutations,
+      final CellScanner cellScanner)
+  throws IOException {
+    MutationProto firstMutate = mutations.get(0);
     if (!region.getRegionInfo().isMetaTable()) {
       cacheFlusher.reclaimMemStoreMemory();
     }
-    byte[] row = firstMutate.getRow().toByteArray();
+    byte [] row = firstMutate.getRow().toByteArray();
     RowMutations rm = new RowMutations(row);
-    for (Mutate mutate: mutates) {
-      MutateType type = mutate.getMutateType();
+    for (MutationProto mutate: mutations) {
+      MutationType type = mutate.getMutateType();
       switch (mutate.getMutateType()) {
       case PUT:
-        rm.add(ProtobufUtil.toPut(mutate));
+        rm.add(ProtobufUtil.toPut(mutate, cellScanner));
         break;
       case DELETE:
-        rm.add(ProtobufUtil.toDelete(mutate));
+        rm.add(ProtobufUtil.toDelete(mutate, cellScanner));
         break;
         default:
           throw new DoNotRetryIOException(

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/QosFunction.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/QosFunction.java?rev=1459013&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/QosFunction.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/QosFunction.java Wed Mar 20 19:36:46 2013
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.exceptions.NotServingRegionException;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.hadoop.hbase.regionserver.HRegionServer.QosPriority;
+import org.apache.hadoop.hbase.util.Pair;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.protobuf.Message;
+import com.google.protobuf.TextFormat;
+
+
+/**
+ * A guava function that will return a priority for use by QoS facility in regionserver; e.g.
+ * rpcs to .META. and -ROOT-, etc., get priority.
+ */
+// TODO: Remove.  This is doing way too much work just to figure a priority.  Do as Elliott
+// suggests and just have the client specify a priority.
+
+//The logic for figuring out high priority RPCs is as follows:
+//1. if the method is annotated with a QosPriority of QOS_HIGH,
+//   that is honored
+//2. parse out the protobuf message and see if the request is for meta
+//   region, and if so, treat it as a high priority RPC
+//Some optimizations for (2) are done here -
+//Clients send the argument classname as part of making the RPC. The server
+//decides whether to deserialize the proto argument message based on the
+//pre-established set of argument classes (knownArgumentClasses below).
+//This prevents the server from having to deserialize all proto argument
+//messages prematurely.
+//All the argument classes declare a 'getRegion' method that returns a
+//RegionSpecifier object. Methods can be invoked on the returned object
+//to figure out whether it is a meta region or not.
+class QosFunction implements Function<Pair<RequestHeader, Message>, Integer> {
+  public static final Log LOG = LogFactory.getLog(QosFunction.class.getName());
+  private final Map<String, Integer> annotatedQos;
+  //We need to mock the regionserver instance for some unit tests (set via
+  //setRegionServer method.
+  private HRegionServer hRegionServer;
+  @SuppressWarnings("unchecked")
+  private final Class<? extends Message>[] knownArgumentClasses = new Class[]{
+      GetRegionInfoRequest.class,
+      GetStoreFileRequest.class,
+      CloseRegionRequest.class,
+      FlushRegionRequest.class,
+      SplitRegionRequest.class,
+      CompactRegionRequest.class,
+      GetRequest.class,
+      MutateRequest.class,
+      ScanRequest.class,
+      MultiRequest.class
+  };
+
+  // Some caches for helping performance
+  private final Map<String, Class<? extends Message>> argumentToClassMap =
+    new HashMap<String, Class<? extends Message>>();
+  private final Map<String, Map<Class<? extends Message>, Method>> methodMap =
+    new HashMap<String, Map<Class<? extends Message>, Method>>();
+
+  QosFunction(final HRegionServer hrs) {
+    this.hRegionServer = hrs;
+    Map<String, Integer> qosMap = new HashMap<String, Integer>();
+    for (Method m : HRegionServer.class.getMethods()) {
+      QosPriority p = m.getAnnotation(QosPriority.class);
+      if (p != null) {
+        qosMap.put(m.getName(), p.priority());
+      }
+    }
+    this.annotatedQos = qosMap;
+
+    if (methodMap.get("getRegion") == null) {
+      methodMap.put("hasRegion", new HashMap<Class<? extends Message>, Method>());
+      methodMap.put("getRegion", new HashMap<Class<? extends Message>, Method>());
+    }
+    for (Class<? extends Message> cls : knownArgumentClasses) {
+      argumentToClassMap.put(cls.getName(), cls);
+      try {
+        methodMap.get("hasRegion").put(cls, cls.getDeclaredMethod("hasRegion"));
+        methodMap.get("getRegion").put(cls, cls.getDeclaredMethod("getRegion"));
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  public boolean isMetaRegion(byte[] regionName) {
+    HRegion region;
+    try {
+      region = hRegionServer.getRegion(regionName);
+    } catch (NotServingRegionException ignored) {
+      return false;
+    }
+    return region.getRegionInfo().isMetaTable();
+  }
+
+  @Override
+  public Integer apply(Pair<RequestHeader, Message> headerAndParam) {
+    RequestHeader header = headerAndParam.getFirst();
+    String methodName = header.getMethodName();
+    Integer priorityByAnnotation = annotatedQos.get(header.getMethodName());
+    if (priorityByAnnotation != null) {
+      return priorityByAnnotation;
+    }
+
+    Message param = headerAndParam.getSecond();
+    if (param == null) {
+      return HConstants.NORMAL_QOS;
+    }
+    String cls = param.getClass().getName();
+    Class<? extends Message> rpcArgClass = argumentToClassMap.get(cls);
+    RegionSpecifier regionSpecifier = null;
+    //check whether the request has reference to meta region or now.
+    try {
+      // Check if the param has a region specifier; the pb methods are hasRegion and getRegion if
+      // hasRegion returns true.  Not all listed methods have region specifier each time.  For
+      // example, the ScanRequest has it on setup but thereafter relies on the scannerid rather than
+      // send the region over every time.
+      Method hasRegion = methodMap.get("hasRegion").get(rpcArgClass);
+      if (hasRegion != null && (Boolean)hasRegion.invoke(param, (Object[])null)) {
+        Method getRegion = methodMap.get("getRegion").get(rpcArgClass);
+        regionSpecifier = (RegionSpecifier)getRegion.invoke(param, (Object[])null);
+        HRegion region = hRegionServer.getRegion(regionSpecifier);
+        if (region.getRegionInfo().isMetaTable()) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("High priority: " + TextFormat.shortDebugString(param));
+          }
+          return HConstants.HIGH_QOS;
+        }
+      }
+    } catch (Exception ex) {
+      // Not good throwing an exception out of here, a runtime anyways.  Let the query go into the
+      // server and have it throw the exception if still an issue.  Just mark it normal priority.
+      if (LOG.isDebugEnabled()) LOG.debug("Marking normal priority after getting exception=" + ex);
+      return HConstants.NORMAL_QOS;
+    }
+
+    if (methodName.equals("scan")) { // scanner methods...
+      ScanRequest request = (ScanRequest)param;
+      if (!request.hasScannerId()) {
+        return HConstants.NORMAL_QOS;
+      }
+      RegionScanner scanner = hRegionServer.getScanner(request.getScannerId());
+      if (scanner != null && scanner.getRegionInfo().isMetaRegion()) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("High priority scanner request: " + TextFormat.shortDebugString(request));
+        }
+        return HConstants.HIGH_QOS;
+      }
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Low priority: " + TextFormat.shortDebugString(param));
+    }
+    return HConstants.NORMAL_QOS;
+  }
+
+  @VisibleForTesting
+  void setRegionServer(final HRegionServer hrs) {
+    this.hRegionServer = hrs;
+  }
+}
\ No newline at end of file

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java?rev=1459013&r1=1459012&r2=1459013&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java Wed Mar 20 19:36:46 2013
@@ -279,7 +279,7 @@ public class IncrementCoalescer implemen
             LOG.error("FAILED_ICV: " + Bytes.toString(row.getTable()) + ", "
                 + Bytes.toStringBinary(row.getRowKey()) + ", "
                 + Bytes.toStringBinary(row.getFamily()) + ", "
-                + Bytes.toStringBinary(row.getQualifier()) + ", " + counter);
+                + Bytes.toStringBinary(row.getQualifier()) + ", " + counter, e);
           }
 
         }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java?rev=1459013&r1=1459012&r2=1459013&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java Wed Mar 20 19:36:46 2013
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hbase.util;
 
 import java.io.IOException;
+import java.io.PrintWriter;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.util.List;
@@ -31,6 +32,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.ShutdownHook;
+import org.apache.hadoop.util.ReflectionUtils;
 
 /**
  * Utility used running a cluster all in the one JVM.
@@ -201,13 +203,22 @@ public class JVMClusterUtil {
     // Wait for an active master to be initialized (implies being master)
     //  with this, when we return the cluster is complete
     startTime = System.currentTimeMillis();
+    final int maxwait = 200000;
     while (true) {
       JVMClusterUtil.MasterThread t = findActiveMaster(masters);
       if (t != null && t.master.isInitialized()) {
         return t.master.getServerName().toString();
       }
-      if (System.currentTimeMillis() > startTime + 200000) {
-        throw new RuntimeException("Master not initialized after 200 seconds");
+      // REMOVE
+      if (System.currentTimeMillis() > startTime + 10000) {
+
+        Threads.sleep(1000);
+      }
+      if (System.currentTimeMillis() > startTime + maxwait) {
+        String msg = "Master not initialized after " + maxwait + "ms seconds";
+        ReflectionUtils.printThreadInfo(new PrintWriter(System.out),
+          "Thread dump because: " + msg);
+        throw new RuntimeException(msg);
       }
       try {
         Thread.sleep(100);
@@ -279,8 +290,6 @@ public class JVMClusterUtil {
       }
     }
 
-
-
     if (masters != null) {
       for (JVMClusterUtil.MasterThread t : masters) {
         while (t.master.isAlive() && !wasInterrupted) {
@@ -306,4 +315,4 @@ public class JVMClusterUtil {
       Thread.currentThread().interrupt();
     }
   }
-}
+}
\ No newline at end of file

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java?rev=1459013&r1=1459012&r2=1459013&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java Wed Mar 20 19:36:46 2013
@@ -38,6 +38,7 @@ import java.util.concurrent.atomic.Atomi
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.*;
 import org.apache.hadoop.hbase.catalog.CatalogTracker;
@@ -50,6 +51,8 @@ import org.apache.hadoop.hbase.exception
 import org.apache.hadoop.hbase.exceptions.TableNotFoundException;
 import org.apache.hadoop.hbase.exceptions.ZooKeeperConnectionException;
 import org.apache.hadoop.hbase.executor.EventHandler;
+import org.apache.hadoop.hbase.ipc.HBaseClient;
+import org.apache.hadoop.hbase.ipc.HBaseServer;
 import org.apache.hadoop.hbase.master.AssignmentManager;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -60,6 +63,7 @@ import org.apache.hadoop.hbase.util.Byte
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.zookeeper.ZKTableReadOnly;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.log4j.Level;
 import org.junit.*;
 import org.junit.experimental.categories.Category;
 
@@ -79,6 +83,9 @@ public class TestAdmin {
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
+    ((Log4JLogger)HBaseServer.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)HBaseClient.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)ScannerCallable.LOG).getLogger().setLevel(Level.ALL);
     TEST_UTIL.getConfiguration().setBoolean("hbase.online.schema.update.enable", true);
     TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100);
     TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250);

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimeout.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimeout.java?rev=1459013&r1=1459012&r2=1459013&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimeout.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimeout.java Wed Mar 20 19:36:46 2013
@@ -23,14 +23,18 @@ import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.MediumTests;
 import org.apache.hadoop.hbase.MiniHBaseCluster.MiniHBaseClusterRegionServer;
+import org.apache.hadoop.hbase.ipc.HBaseClient;
+import org.apache.hadoop.hbase.ipc.HBaseServer;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.log4j.Level;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -55,6 +59,9 @@ public class TestClientScannerRPCTimeout
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
+    ((Log4JLogger)HBaseServer.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)HBaseClient.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)ScannerCallable.LOG).getLogger().setLevel(Level.ALL);
     Configuration conf = TEST_UTIL.getConfiguration();
     conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout);
     conf.setStrings(HConstants.REGION_SERVER_IMPL, RegionServerWithScanTimeout.class.getName());

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java?rev=1459013&r1=1459012&r2=1459013&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java Wed Mar 20 19:36:46 2013
@@ -47,9 +47,9 @@ import java.util.concurrent.atomic.Atomi
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
@@ -78,19 +78,23 @@ import org.apache.hadoop.hbase.filter.Wh
 import org.apache.hadoop.hbase.io.hfile.BlockCache;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+import org.apache.hadoop.hbase.ipc.HBaseClient;
+import org.apache.hadoop.hbase.ipc.HBaseServer;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.MutateType;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
 import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutation.MultiMutateRequest;
 import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutation.MultiRowMutationService;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException;
 import org.apache.hadoop.hbase.exceptions.NoSuchColumnFamilyException;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.log4j.Level;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -120,6 +124,9 @@ public class TestFromClientSide {
    */
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
+    ((Log4JLogger)HBaseServer.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)HBaseClient.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)ScannerCallable.LOG).getLogger().setLevel(Level.ALL);
     Configuration conf = TEST_UTIL.getConfiguration();
     conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
         MultiRowMutationEndpoint.class.getName());
@@ -4148,11 +4155,11 @@ public class TestFromClientSide {
     HTable t = TEST_UTIL.createTable(TABLENAME, FAMILY);
     Put p = new Put(ROW);
     p.add(FAMILY, QUALIFIER, VALUE);
-    Mutate m1 = ProtobufUtil.toMutate(MutateType.PUT, p);
+    MutationProto m1 = ProtobufUtil.toMutation(MutationType.PUT, p);
 
     p = new Put(ROW1);
     p.add(FAMILY, QUALIFIER, VALUE);
-    Mutate m2 = ProtobufUtil.toMutate(MutateType.PUT, p);
+    MutationProto m2 = ProtobufUtil.toMutation(MutationType.PUT, p);
 
     MultiMutateRequest.Builder mrmBuilder = MultiMutateRequest.newBuilder();
     mrmBuilder.addMutationRequest(m1);
@@ -4195,6 +4202,8 @@ public class TestFromClientSide {
     Delete d = new Delete(ROW);
     d.deleteColumns(FAMILY, QUALIFIERS[0]);
     arm.add(d);
+    // TODO: Trying mutateRow again.  The batch was failing with a one try only.
+    // t.mutateRow(arm);
     t.batch(Arrays.asList((Row)arm));
     r = t.get(g);
     assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[1])));

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideWithCoprocessor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideWithCoprocessor.java?rev=1459013&r1=1459012&r2=1459013&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideWithCoprocessor.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideWithCoprocessor.java Wed Mar 20 19:36:46 2013
@@ -39,4 +39,4 @@ public class TestFromClientSideWithCopro
     // We need more than one region server in this test
     TEST_UTIL.startMiniCluster(SLAVES);
   }
-}
+}
\ No newline at end of file

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java?rev=1459013&r1=1459012&r2=1459013&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java Wed Mar 20 19:36:46 2013
@@ -660,6 +660,8 @@ public class TestHCM {
       }
     }
     assertNotNull(otherRow);
+    // If empty row, set it to first row.-f
+    if (otherRow.length <= 0) otherRow = Bytes.toBytes("aaa");
     Put put2 = new Put(otherRow);
     put2.add(FAM_NAM, otherRow, otherRow);
     table.put(put2); //cache put2's location

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java?rev=1459013&r1=1459012&r2=1459013&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java Wed Mar 20 19:36:46 2013
@@ -68,6 +68,9 @@ public class TestMultiParallel {
   private static final int slaves = 2; // also used for testing HTable pool size
 
   @BeforeClass public static void beforeClass() throws Exception {
+    ((Log4JLogger)HBaseServer.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)HBaseClient.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)ScannerCallable.LOG).getLogger().setLevel(Level.ALL);
     UTIL.startMiniCluster(slaves);
     HTable t = UTIL.createTable(Bytes.toBytes(TEST_TABLE), Bytes.toBytes(FAMILY));
     UTIL.createMultiRegions(t, Bytes.toBytes(FAMILY));

Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/codec/CodecPerformance.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/codec/CodecPerformance.java?rev=1459013&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/codec/CodecPerformance.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/codec/CodecPerformance.java Wed Mar 20 19:36:46 2013
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.codec;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.codec.CellCodec;
+import org.apache.hadoop.hbase.codec.Codec;
+import org.apache.hadoop.hbase.codec.KeyValueCodec;
+import org.apache.hadoop.hbase.codec.MessageCodec;
+import org.apache.hadoop.hbase.io.CellOutputStream;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Do basic codec performance eval.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class CodecPerformance {
+  public static final Log LOG = LogFactory.getLog(CodecPerformance.class);
+
+  static Cell [] getCells(final int howMany) {
+    Cell [] cells = new Cell[howMany];
+    for (int i = 0; i < howMany; i++) {
+      byte [] index = Bytes.toBytes(i);
+      KeyValue kv = new KeyValue(index, Bytes.toBytes("f"), index, index);
+      cells[i] = kv;
+    }
+    return cells;
+  }
+
+  static int getRoughSize(final Cell [] cells) {
+    int size = 0;
+    for (Cell c: cells) {
+      size += c.getRowLength() + c.getFamilyLength() + c.getQualifierLength() + c.getValueLength();
+      size += Bytes.SIZEOF_LONG + Bytes.SIZEOF_BYTE;
+    }
+    return size;
+  }
+
+  static byte [] runEncoderTest(final int index, final int initialBufferSize,
+      final ByteArrayOutputStream baos, final CellOutputStream encoder, final Cell [] cells)
+  throws IOException {
+    long startTime = System.currentTimeMillis();
+    for (int i = 0; i < cells.length; i++) {
+      encoder.write(cells[i]);
+    }
+    encoder.flush();
+    LOG.info("" + index + " encoded count=" + cells.length + " in " +
+      (System.currentTimeMillis() - startTime) + "ms for encoder " + encoder);
+    // Ensure we did not have to grow the backing buffer.
+    assertTrue(baos.size() < initialBufferSize);
+    return baos.toByteArray();
+  }
+
+  static Cell [] runDecoderTest(final int index, final int count, final CellScanner decoder)
+  throws IOException {
+    Cell [] cells = new Cell[count];
+    long startTime = System.currentTimeMillis();
+    for (int i = 0; decoder.advance(); i++) {
+      cells[i] = decoder.current();
+    }
+    LOG.info("" + index + " decoded count=" + cells.length + " in " +
+      (System.currentTimeMillis() - startTime) + "ms for decoder " + decoder);
+    // Ensure we did not have to grow the backing buffer.
+    assertTrue(cells.length == count);
+    return cells;
+  }
+
+  static void verifyCells(final Cell [] input, final Cell [] output) {
+    assertEquals(input.length, output.length);
+    for (int i = 0; i < input.length; i ++) {
+      input[i].equals(output[i]);
+    }
+  }
+
+  static void doCodec(final Codec codec, final Cell [] cells, final int cycles, final int count,
+      final int initialBufferSize)
+  throws IOException {
+    byte [] bytes = null;
+    Cell [] cellsDecoded = null;
+    for (int i = 0; i < cycles; i++) {
+      ByteArrayOutputStream baos = new ByteArrayOutputStream(initialBufferSize);
+      Codec.Encoder encoder = codec.getEncoder(baos);
+      bytes = runEncoderTest(i, initialBufferSize, baos, encoder, cells);
+    }
+    for (int i = 0; i < cycles; i++) {
+      ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+      Codec.Decoder decoder = codec.getDecoder(bais);
+      cellsDecoded = CodecPerformance.runDecoderTest(i, count, decoder);
+    }
+    verifyCells(cells, cellsDecoded);
+  }
+
+  public static void main(String[] args) throws IOException {
+    // How many Cells to encode/decode on each cycle.
+    final int count = 100000;
+    // How many times to do an operation; repeat gives hotspot chance to warm up.
+    final int cycles = 30;
+
+    Cell [] cells = getCells(count);
+    int size = getRoughSize(cells);
+    int initialBufferSize = 2 * size; // Multiply by 2 to ensure we don't have to grow buffer
+
+    // Test KeyValue codec.
+    doCodec(new KeyValueCodec(), cells, cycles, count, initialBufferSize);
+    doCodec(new CellCodec(), cells, cycles, count, initialBufferSize);
+    doCodec(new MessageCodec(), cells, cycles, count, initialBufferSize);
+  }
+}
\ No newline at end of file

Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/codec/TestCellMessageCodec.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/codec/TestCellMessageCodec.java?rev=1459013&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/codec/TestCellMessageCodec.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/codec/TestCellMessageCodec.java Wed Mar 20 19:36:46 2013
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.codec;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.codec.Codec;
+import org.apache.hadoop.hbase.codec.MessageCodec;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.io.CountingInputStream;
+import com.google.common.io.CountingOutputStream;
+
+@Category(SmallTests.class)
+public class TestCellMessageCodec {
+  public static final Log LOG = LogFactory.getLog(TestCellMessageCodec.class);
+
+  @Test
+  public void testEmptyWorks() throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    CountingOutputStream cos = new CountingOutputStream(baos);
+    DataOutputStream dos = new DataOutputStream(cos);
+    MessageCodec cmc = new MessageCodec();
+    Codec.Encoder encoder = cmc.getEncoder(dos);
+    encoder.flush();
+    dos.close();
+    long offset = cos.getCount();
+    assertEquals(0, offset);
+    CountingInputStream cis = new CountingInputStream(new ByteArrayInputStream(baos.toByteArray()));
+    DataInputStream dis = new DataInputStream(cis);
+    Codec.Decoder decoder = cmc.getDecoder(dis);
+    assertFalse(decoder.advance());
+    dis.close();
+    assertEquals(0, cis.getCount());
+  }
+
+  @Test
+  public void testOne() throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    CountingOutputStream cos = new CountingOutputStream(baos);
+    DataOutputStream dos = new DataOutputStream(cos);
+    MessageCodec cmc = new MessageCodec();
+    Codec.Encoder encoder = cmc.getEncoder(dos);
+    final KeyValue kv =
+      new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("q"), Bytes.toBytes("v"));
+    encoder.write(kv);
+    encoder.flush();
+    dos.close();
+    long offset = cos.getCount();
+    CountingInputStream cis = new CountingInputStream(new ByteArrayInputStream(baos.toByteArray()));
+    DataInputStream dis = new DataInputStream(cis);
+    Codec.Decoder decoder = cmc.getDecoder(dis);
+    assertTrue(decoder.advance()); // First read should pull in the KV
+    assertFalse(decoder.advance()); // Second read should trip over the end-of-stream  marker and return false
+    dis.close();
+    assertEquals(offset, cis.getCount());
+  }
+
+  @Test
+  public void testThree() throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    CountingOutputStream cos = new CountingOutputStream(baos);
+    DataOutputStream dos = new DataOutputStream(cos);
+    MessageCodec cmc = new MessageCodec();
+    Codec.Encoder encoder = cmc.getEncoder(dos);
+    final KeyValue kv1 = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("1"), Bytes.toBytes("1"));
+    final KeyValue kv2 = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("2"), Bytes.toBytes("2"));
+    final KeyValue kv3 = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("3"), Bytes.toBytes("3"));
+    encoder.write(kv1);
+    encoder.write(kv2);
+    encoder.write(kv3);
+    encoder.flush();
+    dos.close();
+    long offset = cos.getCount();
+    CountingInputStream cis = new CountingInputStream(new ByteArrayInputStream(baos.toByteArray()));
+    DataInputStream dis = new DataInputStream(cis);
+    Codec.Decoder decoder = cmc.getDecoder(dis);
+    assertTrue(decoder.advance());
+    Cell c = decoder.current();
+    assertTrue(CellComparator.equals(c, kv1));
+    assertTrue(decoder.advance());
+    c = decoder.current();
+    assertTrue(CellComparator.equals(c, kv2));
+    assertTrue(decoder.advance());
+    c = decoder.current();
+    assertTrue(CellComparator.equals(c, kv3));
+    assertFalse(decoder.advance());
+    dis.close();
+    assertEquals(offset, cis.getCount());
+  }
+}
\ No newline at end of file

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilterWithScanLimits.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilterWithScanLimits.java?rev=1459013&r1=1459012&r2=1459013&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilterWithScanLimits.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilterWithScanLimits.java Wed Mar 20 19:36:46 2013
@@ -25,6 +25,7 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -33,12 +34,15 @@ import org.apache.hadoop.hbase.HTableDes
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.exceptions.MasterNotRunningException;
 import org.apache.hadoop.hbase.exceptions.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.ipc.HBaseClient;
+import org.apache.hadoop.hbase.ipc.HBaseServer;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.ScannerCallable;
 import org.apache.hadoop.hbase.util.Bytes;
 
 import org.junit.AfterClass;
@@ -46,6 +50,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import static org.junit.Assert.*;
 import org.apache.hadoop.hbase.MediumTests;
+import org.apache.log4j.Level;
 import org.junit.experimental.categories.Category;
 
 /**
@@ -172,7 +177,9 @@ public class TestFilterWithScanLimits {
 
   @BeforeClass
   public static void setUp() throws Exception {
-    Configuration config = TEST_UTIL.getConfiguration();
+    ((Log4JLogger)HBaseServer.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)HBaseClient.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)ScannerCallable.LOG).getLogger().setLevel(Level.ALL);
     TEST_UTIL.startMiniCluster(1);
     initialize(TEST_UTIL.getConfiguration());
   }

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java?rev=1459013&r1=1459012&r2=1459013&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java Wed Mar 20 19:36:46 2013
@@ -44,6 +44,8 @@ import org.apache.log4j.spi.LoggingEvent
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import com.google.protobuf.ServiceException;
+
 /**
  * Test that delayed RPCs work. Fire up three calls, the first of which should
  * be delayed. Check that the last two, which are undelayed, return before the
@@ -100,8 +102,7 @@ public class TestDelayedRpc {
 
       assertEquals(UNDELAYED, results.get(0).intValue());
       assertEquals(UNDELAYED, results.get(1).intValue());
-      assertEquals(results.get(2).intValue(), delayReturnValue ? DELAYED :
-          0xDEADBEEF);
+      assertEquals(results.get(2).intValue(), delayReturnValue ? DELAYED :  0xDEADBEEF);
     } finally {
       clientEngine.close();
     }
@@ -182,7 +183,7 @@ public class TestDelayedRpc {
   }
 
   public interface TestRpc extends IpcProtocol {
-    TestResponse test(TestArg delay);
+    TestResponse test(final Object rpcController, TestArg delay) throws ServiceException;
   }
 
   private static class TestRpcImpl implements TestRpc {
@@ -201,7 +202,8 @@ public class TestDelayedRpc {
     }
 
     @Override
-    public TestResponse test(final TestArg testArg) {
+    public TestResponse test(final Object rpcController, final TestArg testArg)
+    throws ServiceException {
       boolean delay = testArg.getDelay();
       TestResponse.Builder responseBuilder = TestResponse.newBuilder();
       if (!delay) {
@@ -243,9 +245,8 @@ public class TestDelayedRpc {
     @Override
     public void run() {
       try {
-        Integer result = 
-            new Integer(server.test(TestArg.newBuilder()
-                .setDelay(delay).build()).getResponse());
+        Integer result = new Integer(server.test(null, TestArg.newBuilder().setDelay(delay).
+          build()).getResponse());
         if (results != null) {
           synchronized (results) {
             results.add(result);
@@ -276,7 +277,7 @@ public class TestDelayedRpc {
       int result = 0xDEADBEEF;
 
       try {
-        result = client.test(TestArg.newBuilder().setDelay(false).build()).getResponse();
+        result = client.test(null, TestArg.newBuilder().setDelay(false).build()).getResponse();
       } catch (Exception e) {
         fail("No exception should have been thrown.");
       }
@@ -284,7 +285,7 @@ public class TestDelayedRpc {
 
       boolean caughtException = false;
       try {
-        result = client.test(TestArg.newBuilder().setDelay(true).build()).getResponse();
+        result = client.test(null, TestArg.newBuilder().setDelay(true).build()).getResponse();
       } catch(Exception e) {
         // Exception thrown by server is enclosed in a RemoteException.
         if (e.getCause().getMessage().contains(
@@ -303,7 +304,7 @@ public class TestDelayedRpc {
    */
   private static class FaultyTestRpc implements TestRpc {
     @Override
-    public TestResponse test(TestArg arg) {
+    public TestResponse test(Object rpcController, TestArg arg) {
       if (!arg.getDelay())
         return TestResponse.newBuilder().setResponse(UNDELAYED).build();
       Delayable call = HBaseServer.getCurrentCall();



Mime
View raw message