hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r1527687 [3/3] - in /hbase/branches/0.96: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ hbase-protocol/ hbase-...
Date Mon, 30 Sep 2013 17:59:11 GMT
Modified: hbase/branches/0.96/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java?rev=1527687&r1=1527686&r2=1527687&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java
(original)
+++ hbase/branches/0.96/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java
Mon Sep 30 17:59:10 2013
@@ -3662,6 +3662,26 @@ public final class RPCProtos {
      * </pre>
      */
     org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMetaOrBuilder getCellBlockMetaOrBuilder();
+
+    // optional uint32 priority = 6;
+    /**
+     * <code>optional uint32 priority = 6;</code>
+     *
+     * <pre>
+     * 0 is NORMAL priority.  100 is HIGH.  If no priority, treat it as NORMAL.
+     * See HConstants.
+     * </pre>
+     */
+    boolean hasPriority();
+    /**
+     * <code>optional uint32 priority = 6;</code>
+     *
+     * <pre>
+     * 0 is NORMAL priority.  100 is HIGH.  If no priority, treat it as NORMAL.
+     * See HConstants.
+     * </pre>
+     */
+    int getPriority();
   }
   /**
    * Protobuf type {@code RequestHeader}
@@ -3759,6 +3779,11 @@ public final class RPCProtos {
               bitField0_ |= 0x00000010;
               break;
             }
+            case 48: {
+              bitField0_ |= 0x00000020;
+              priority_ = input.readUInt32();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -3946,12 +3971,39 @@ public final class RPCProtos {
       return cellBlockMeta_;
     }
 
+    // optional uint32 priority = 6;
+    public static final int PRIORITY_FIELD_NUMBER = 6;
+    private int priority_;
+    /**
+     * <code>optional uint32 priority = 6;</code>
+     *
+     * <pre>
+     * 0 is NORMAL priority.  100 is HIGH.  If no priority, treat it as NORMAL.
+     * See HConstants.
+     * </pre>
+     */
+    public boolean hasPriority() {
+      return ((bitField0_ & 0x00000020) == 0x00000020);
+    }
+    /**
+     * <code>optional uint32 priority = 6;</code>
+     *
+     * <pre>
+     * 0 is NORMAL priority.  100 is HIGH.  If no priority, treat it as NORMAL.
+     * See HConstants.
+     * </pre>
+     */
+    public int getPriority() {
+      return priority_;
+    }
+
     private void initFields() {
       callId_ = 0;
       traceInfo_ = org.apache.hadoop.hbase.protobuf.generated.TracingProtos.RPCTInfo.getDefaultInstance();
       methodName_ = "";
       requestParam_ = false;
       cellBlockMeta_ = org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta.getDefaultInstance();
+      priority_ = 0;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -3980,6 +4032,9 @@ public final class RPCProtos {
       if (((bitField0_ & 0x00000010) == 0x00000010)) {
         output.writeMessage(5, cellBlockMeta_);
       }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        output.writeUInt32(6, priority_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -4009,6 +4064,10 @@ public final class RPCProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeMessageSize(5, cellBlockMeta_);
       }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeUInt32Size(6, priority_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -4057,6 +4116,11 @@ public final class RPCProtos {
         result = result && getCellBlockMeta()
             .equals(other.getCellBlockMeta());
       }
+      result = result && (hasPriority() == other.hasPriority());
+      if (hasPriority()) {
+        result = result && (getPriority()
+            == other.getPriority());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -4090,6 +4154,10 @@ public final class RPCProtos {
         hash = (37 * hash) + CELL_BLOCK_META_FIELD_NUMBER;
         hash = (53 * hash) + getCellBlockMeta().hashCode();
       }
+      if (hasPriority()) {
+        hash = (37 * hash) + PRIORITY_FIELD_NUMBER;
+        hash = (53 * hash) + getPriority();
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -4223,6 +4291,8 @@ public final class RPCProtos {
           cellBlockMetaBuilder_.clear();
         }
         bitField0_ = (bitField0_ & ~0x00000010);
+        priority_ = 0;
+        bitField0_ = (bitField0_ & ~0x00000020);
         return this;
       }
 
@@ -4279,6 +4349,10 @@ public final class RPCProtos {
         } else {
           result.cellBlockMeta_ = cellBlockMetaBuilder_.build();
         }
+        if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
+          to_bitField0_ |= 0x00000020;
+        }
+        result.priority_ = priority_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -4312,6 +4386,9 @@ public final class RPCProtos {
         if (other.hasCellBlockMeta()) {
           mergeCellBlockMeta(other.getCellBlockMeta());
         }
+        if (other.hasPriority()) {
+          setPriority(other.getPriority());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -4781,6 +4858,59 @@ public final class RPCProtos {
         return cellBlockMetaBuilder_;
       }
 
+      // optional uint32 priority = 6;
+      private int priority_ ;
+      /**
+       * <code>optional uint32 priority = 6;</code>
+       *
+       * <pre>
+       * 0 is NORMAL priority.  100 is HIGH.  If no priority, treat it as NORMAL.
+       * See HConstants.
+       * </pre>
+       */
+      public boolean hasPriority() {
+        return ((bitField0_ & 0x00000020) == 0x00000020);
+      }
+      /**
+       * <code>optional uint32 priority = 6;</code>
+       *
+       * <pre>
+       * 0 is NORMAL priority.  100 is HIGH.  If no priority, treat it as NORMAL.
+       * See HConstants.
+       * </pre>
+       */
+      public int getPriority() {
+        return priority_;
+      }
+      /**
+       * <code>optional uint32 priority = 6;</code>
+       *
+       * <pre>
+       * 0 is NORMAL priority.  100 is HIGH.  If no priority, treat it as NORMAL.
+       * See HConstants.
+       * </pre>
+       */
+      public Builder setPriority(int value) {
+        bitField0_ |= 0x00000020;
+        priority_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional uint32 priority = 6;</code>
+       *
+       * <pre>
+       * 0 is NORMAL priority.  100 is HIGH.  If no priority, treat it as NORMAL.
+       * See HConstants.
+       * </pre>
+       */
+      public Builder clearPriority() {
+        bitField0_ = (bitField0_ & ~0x00000020);
+        priority_ = 0;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:RequestHeader)
     }
 
@@ -5797,15 +5927,15 @@ public final class RPCProtos {
       "\001(\r\"|\n\021ExceptionResponse\022\034\n\024exception_cl" +
       "ass_name\030\001 \001(\t\022\023\n\013stack_trace\030\002 \001(\t\022\020\n\010h"
+
       "ostname\030\003 \001(\t\022\014\n\004port\030\004 \001(\005\022\024\n\014do_not_re",
-      "try\030\005 \001(\010\"\224\001\n\rRequestHeader\022\017\n\007call_id\030\001" +
+      "try\030\005 \001(\010\"\246\001\n\rRequestHeader\022\017\n\007call_id\030\001" +
       " \001(\r\022\035\n\ntrace_info\030\002 \001(\0132\t.RPCTInfo\022\023\n\013m" +
       "ethod_name\030\003 \001(\t\022\025\n\rrequest_param\030\004 \001(\010\022" +
       "\'\n\017cell_block_meta\030\005 \001(\0132\016.CellBlockMeta" +
-      "\"q\n\016ResponseHeader\022\017\n\007call_id\030\001 \001(\r\022%\n\te" +
-      "xception\030\002 \001(\0132\022.ExceptionResponse\022\'\n\017ce" +
-      "ll_block_meta\030\003 \001(\0132\016.CellBlockMetaB<\n*o" +
-      "rg.apache.hadoop.hbase.protobuf.generate" +
-      "dB\tRPCProtosH\001\240\001\001"
+      "\022\020\n\010priority\030\006 \001(\r\"q\n\016ResponseHeader\022\017\n\007" +
+      "call_id\030\001 \001(\r\022%\n\texception\030\002 \001(\0132\022.Excep" +
+      "tionResponse\022\'\n\017cell_block_meta\030\003 \001(\0132\016." +
+      "CellBlockMetaB<\n*org.apache.hadoop.hbase" +
+      ".protobuf.generatedB\tRPCProtosH\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -5841,7 +5971,7 @@ public final class RPCProtos {
           internal_static_RequestHeader_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_RequestHeader_descriptor,
-              new java.lang.String[] { "CallId", "TraceInfo", "MethodName", "RequestParam",
"CellBlockMeta", });
+              new java.lang.String[] { "CallId", "TraceInfo", "MethodName", "RequestParam",
"CellBlockMeta", "Priority", });
           internal_static_ResponseHeader_descriptor =
             getDescriptor().getMessageTypes().get(5);
           internal_static_ResponseHeader_fieldAccessorTable = new

Modified: hbase/branches/0.96/hbase-protocol/src/main/protobuf/Client.proto
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-protocol/src/main/protobuf/Client.proto?rev=1527687&r1=1527686&r2=1527687&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-protocol/src/main/protobuf/Client.proto (original)
+++ hbase/branches/0.96/hbase-protocol/src/main/protobuf/Client.proto Mon Sep 30 17:59:10
2013
@@ -318,42 +318,41 @@ message CoprocessorServiceResponse {
 }
 
 /**
- * An action that is part of MultiRequest.
- * This is a union type - exactly one of the fields will be set.
+ * Mutations to run against a Region.
  */
-message MultiAction {
-  optional MutationProto mutation = 1;
-  optional Get get = 2;
+message RegionMutation {
+  required RegionSpecifier region = 1;
+  // When set, run mutations as atomic unit.
+  optional bool atomic = 2;
+  repeated MutationProto mutation = 3;
 }
 
 /**
- * An individual action result. The result will in the
- * same order as the action in the request. If an action
- * returns a value, it is set in value field. If it doesn't
- * return anything, the result will be empty. If an action
- * fails to execute due to any exception, the exception
- * is returned as a stringified parameter.
+ * Either a Result or an Exception NameBytesPair (keyed by
+ * exception name whose value is the exception stringified)
+ * or maybe empty if no result and no exception.
  */
-message ActionResult {
-  optional Result value = 1;
+message ResultOrException {
+  optional Result result = 1;
   optional NameBytesPair exception = 2;
 }
 
 /**
- * You can execute a list of actions on a given region in order.
- *
- * If it is a list of mutate actions, atomic can be set
- * to make sure they can be processed atomically, just like
- * RowMutations.
+ * The result of a RegionMutation.
+ */
+message RegionMutationResult {
+  repeated ResultOrException resultOrException = 1;
+}
+
+/**
+ * Execute a list of actions on a given region in order.
  */
 message MultiRequest {
-  required RegionSpecifier region = 1;
-  repeated MultiAction action = 2;
-  optional bool atomic = 3;
+  repeated RegionMutation regionMutation = 1;
 }
 
 message MultiResponse {
-  repeated ActionResult result = 1;
+  repeated RegionMutationResult regionMutationResult = 1;
 }
 
 

Modified: hbase/branches/0.96/hbase-protocol/src/main/protobuf/RPC.proto
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-protocol/src/main/protobuf/RPC.proto?rev=1527687&r1=1527686&r2=1527687&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-protocol/src/main/protobuf/RPC.proto (original)
+++ hbase/branches/0.96/hbase-protocol/src/main/protobuf/RPC.proto Mon Sep 30 17:59:10 2013
@@ -119,7 +119,9 @@ message RequestHeader {
   optional bool request_param = 4;
   // If present, then an encoded data block follows.
   optional CellBlockMeta cell_block_meta = 5;
-  // TODO: Have client specify priority
+  // 0 is NORMAL priority.  100 is HIGH.  If no priority, treat it as NORMAL.
+  // See HConstants.
+  optional uint32 priority = 6;
 }
 
 message ResponseHeader {

Modified: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1527687&r1=1527686&r2=1527687&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
(original)
+++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
Mon Sep 30 17:59:10 2013
@@ -73,16 +73,16 @@ import org.apache.hadoop.hbase.DroppedSn
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.NotServingRegionException;
-import org.apache.hadoop.hbase.RegionTooBusyException;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.UnknownScannerException;
 import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
 import org.apache.hadoop.hbase.HDFSBlocksDistribution;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.RegionTooBusyException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.UnknownScannerException;
 import org.apache.hadoop.hbase.backup.HFileArchiver;
 import org.apache.hadoop.hbase.client.Append;
 import org.apache.hadoop.hbase.client.Delete;
@@ -114,11 +114,9 @@ import org.apache.hadoop.hbase.ipc.RpcSe
 import org.apache.hadoop.hbase.master.AssignmentManager;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;

Modified: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1527687&r1=1527686&r2=1527687&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
(original)
+++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
Mon Sep 30 17:59:10 2013
@@ -65,7 +65,6 @@ import org.apache.hadoop.hbase.ClockOutO
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.HealthCheckChore;
@@ -90,7 +89,6 @@ import org.apache.hadoop.hbase.client.In
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Row;
 import org.apache.hadoop.hbase.client.RowMutations;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
@@ -119,7 +117,6 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionResponse;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
@@ -137,7 +134,6 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsRequest;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsResponse;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
-import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState;
@@ -151,8 +147,8 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerResponse;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ActionResult;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
@@ -169,6 +165,9 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionMutation;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionMutationResult;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
@@ -3273,8 +3272,7 @@ public class HRegionServer implements Cl
   /**
    * Execute multiple actions on a table: get, mutate, and/or execCoprocessor
    *
-   * @param rpcc the RPC controller
-   * @param request the multi request
+   * @param rpcc the RPC controller * @param request the multi request
    * @throws ServiceException
    */
   @Override
@@ -3284,108 +3282,112 @@ public class HRegionServer implements Cl
     // It is also the conduit via which we pass back data.
     PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
     CellScanner cellScanner = controller != null? controller.cellScanner(): null;
-    // Clear scanner so we are not holding on to reference across call.
-    controller.setCellScanner(null);
-    List<CellScannable> cellsToReturn = null;
-    try {
-      HRegion region = getRegion(request.getRegion());
-      MultiResponse.Builder builder = MultiResponse.newBuilder();
-      List<MutationProto> mutations = new ArrayList<MutationProto>(request.getActionCount());
-      // Do a bunch of mutations atomically.  Mutations are Puts and Deletes.  NOT Gets.
-      if (request.hasAtomic() && request.getAtomic()) {
-        // MultiAction is union type.  Has a Get or a Mutate.
-        for (ClientProtos.MultiAction actionUnion : request.getActionList()) {
-          if (actionUnion.hasMutation()) {
-            mutations.add(actionUnion.getMutation());
-          } else {
-            throw new DoNotRetryIOException("Unsupported atomic action type: " + actionUnion);
-          }
-        }
-        // TODO: We are not updating a metric here.  Should we up requestCount?
-        if (!mutations.isEmpty()) mutateRows(region, mutations, cellScanner);
-      } else {
-        // Do a bunch of Actions.
-        ActionResult.Builder resultBuilder = null;
-        cellsToReturn = new ArrayList<CellScannable>(request.getActionCount());
-        for (ClientProtos.MultiAction actionUnion : request.getActionList()) {
+    if (controller != null) controller.setCellScanner(null);
+    List<CellScannable> cellsToReturn = null; MultiResponse.Builder responseBuilder
= MultiResponse.newBuilder(); try { for (RegionMutation regionMutation: request.getRegionMutationList())
{
+        RegionMutationResult.Builder regionMutationResultBuilder = null;
+        HRegion region = getRegion(regionMutation.getRegion());
+        if (regionMutation.hasAtomic() && regionMutation.getAtomic()) {
           this.requestCount.increment();
-          ClientProtos.Result result = null;
-          try {
-            if (actionUnion.hasGet()) {
-              Get get = ProtobufUtil.toGet(actionUnion.getGet());
-              Result r = region.get(get);
-              if (r != null) {
-                // Get a result with no data.  The data will be carried alongside pbs, not
as pbs.
-                result = ProtobufUtil.toResultNoData(r);
-                // Add the Result to controller so it gets serialized apart from pb.  Get
-                // Results could be big so good if they are not serialized as pb.
-                cellsToReturn.add(r);
-              }
-            } else if (actionUnion.hasMutation()) {
-              MutationProto mutation = actionUnion.getMutation();
-              MutationType type = mutation.getMutateType();
-              if (type != MutationType.PUT && type != MutationType.DELETE) {
-                if (!mutations.isEmpty()) {
-                  doBatchOp(builder, region, mutations, cellScanner);
-                  mutations.clear();
-                } else if (!region.getRegionInfo().isMetaTable()) {
-                  cacheFlusher.reclaimMemStoreMemory();
-                }
-              }
-              Result r = null;
-              switch (type) {
-              case APPEND:
-                r = append(region, mutation, cellScanner);
-                break;
-              case INCREMENT:
-                r = increment(region, mutation, cellScanner);
-                break;
-              case PUT:
-              case DELETE:
-                mutations.add(mutation);
-                break;
-              default:
-                throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
-              }
-              if (r != null) {
-                // Put the data into the cellsToReturn and the metadata about the result
is all that
-                // we will pass back in the protobuf result.
-                result = ProtobufUtil.toResultNoData(r);
-                cellsToReturn.add(r);
-              }
-            } else {
-              LOG.warn("Error: invalid action: " + actionUnion + ". "
-                + "it must be a Get, Mutate, or Exec.");
-              throw new DoNotRetryIOException("Invalid action, "
-                + "it must be a Get, Mutate, or Exec.");
-            }
-            if (result != null) {
-              if (resultBuilder == null) {
-                resultBuilder = ActionResult.newBuilder();
-              } else {
-                resultBuilder.clear();
-              }
-              resultBuilder.setValue(result);
-              builder.addResult(resultBuilder.build());
-            }
-          } catch (IOException ie) {
-            builder.addResult(ResponseConverter.buildActionResult(ie));
-          }
-        }
-        if (!mutations.isEmpty()) {
-          doBatchOp(builder, region, mutations, cellScanner);
-        }
+          mutateRows(region, regionMutation.getMutationList(), cellScanner);
+        } else {
+          regionMutationResultBuilder = RegionMutationResult.newBuilder();
+          cellsToReturn = doNonAtomicRegionMutation(region, regionMutation, cellScanner,
+            regionMutationResultBuilder, cellsToReturn);
+        }
+        // Have one regionmutationresult per regionmutation even if it is empty so we keep
results
+        // aligned w/ how the requests came in.
+        responseBuilder.addRegionMutationResult(regionMutationResultBuilder == null?
+          RegionMutationResult.getDefaultInstance(): regionMutationResultBuilder.build());
       }
       // Load the controller with the Cells to return.
-      if (cellsToReturn != null && !cellsToReturn.isEmpty()) {
+      if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller
!= null) {
         controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn));
       }
-      return builder.build();
+      return responseBuilder.build();
     } catch (IOException ie) {
       throw new ServiceException(ie);
     }
   }
 
+  /**
+   * Run through the regionMutation <code>rm</code> and per Mutation, do the
work, and then when
+   * done, add an instance of a {@link ResultOrException} that corresponds to each Mutation.
+   * @param region
+   * @param rm
+   * @param cellScanner
+   * @param builder
+   * @param cellsToReturn  Could be null. May be allocated in this method.  This is what
this
+   * method returns as a 'result'.
+   * @return Return the <code>cellScanner</code> passed
+   */
+  private List<CellScannable> doNonAtomicRegionMutation(final HRegion region,
+      final RegionMutation rm, final CellScanner cellScanner,
+      final RegionMutationResult.Builder builder, List<CellScannable> cellsToReturn)
{
+    // Gather up CONTIGUOUS Puts and Deletes in this mutations List.  Idea is that rather
than do
+    // one at a time, we instead pass them in batch.  Be aware that the corresponding
+    // ResultOrException instance that matches each Put or Delete is then added down in the
+    // doBatchOp call.  We should be staying aligned though the Put and Delete are deferred/batched
+    List<MutationProto> mutations = null;
+    for (ClientProtos.MutationProto m: rm.getMutationList()) {
+      ClientProtos.ResultOrException resultOrException = null;
+      try {
+        Result r = null;
+        MutationType type = m.getMutateType();
+        if (type != MutationType.PUT && type != MutationType.DELETE && mutations
!= null &&
+            !mutations.isEmpty()) {
+          // Flush out any Puts or Deletes already collected.
+          doBatchOp(builder, region, mutations, cellScanner);
+          mutations.clear();
+        }
+        switch (type) {
+        case APPEND:
+          r = append(region, m, cellScanner);
+          break;
+        case INCREMENT:
+          r = increment(region, m, cellScanner);
+          break;
+        case PUT:
+        case DELETE:
+          // Collect the individual mutations and apply in a batch
+          if (mutations == null) mutations =
+            new ArrayList<MutationProto>(rm.getMutationCount());
+          mutations.add(m);
+          break;
+        default:
+          throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
+        }
+        if (r != null) {
+          ClientProtos.Result pbResult = null;
+          if (isClientCellBlockSupport()) {
+            pbResult = ProtobufUtil.toResultNoData(r);
+            //  Hard to guess the size here.  Just make a rough guess.
+            if (cellsToReturn == null) cellsToReturn = new ArrayList<CellScannable>(256);
+            cellsToReturn.add(r);
+          } else {
+            pbResult = ProtobufUtil.toResult(r);
+          }
+          resultOrException =
+            ClientProtos.ResultOrException.newBuilder().setResult(pbResult).build();
+        }
+        // Could get to here and there was no result and no exception.  Presumes we added
+        // a Put or Delete to the collecting Mutations List for adding later.  In this
+        // case the corresponding ResultOrException instance for the Put or Delete will be
added
+        // down in the doBatchOp method call rather than up here.
+      } catch (IOException ie) {
+        resultOrException = ResultOrException.newBuilder().
+          setException(ResponseConverter.buildException(ie)).build();
+      }
+      if (resultOrException != null) {
+        builder.addResultOrException(resultOrException);
+      }
+    }
+    // Finish up any outstanding mutations
+    if (!mutations.isEmpty()) {
+      doBatchOp(builder, region, mutations, cellScanner);
+    }
+     return cellsToReturn;
+  }
+
 // End Client methods
 // Start Admin methods
 
@@ -3861,20 +3863,20 @@ public class HRegionServer implements Cl
     try {
       checkOpen();
       List<WALEntry> entries = request.getEntryList();
-      if(entries == null || entries.isEmpty()) {
+      if (entries == null || entries.isEmpty()) {
         // empty input
         return ReplicateWALEntryResponse.newBuilder().build();
       }
-      
+
       HRegion region = this.getRegionByEncodedName(
         entries.get(0).getKey().getEncodedRegionName().toStringUtf8());
       RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
       List<Pair<HLogKey, WALEdit>> walEntries = new ArrayList<Pair<HLogKey,
WALEdit>>();
       List<Pair<MutationType, Mutation>> mutations = new ArrayList<Pair<MutationType,
Mutation>>();
       for (WALEntry entry : entries) {
-        Pair<HLogKey, WALEdit> walEntry = (coprocessorHost == null) ? null : 
+        Pair<HLogKey, WALEdit> walEntry = (coprocessorHost == null) ? null :
           new Pair<HLogKey, WALEdit>();
-        List<Pair<MutationType, Mutation>> edits = HLogSplitter.getMutationsFromWALEntry(entry,

+        List<Pair<MutationType, Mutation>> edits = HLogSplitter.getMutationsFromWALEntry(entry,
           cells, walEntry);
         if (coprocessorHost != null) {
           // Start coprocessor replay here. The coprocessor is for each WALEdit instead of
a
@@ -4046,15 +4048,13 @@ public class HRegionServer implements Cl
    * @param region
    * @param mutations
    */
-  protected void doBatchOp(final MultiResponse.Builder builder, final HRegion region,
+  protected void doBatchOp(final RegionMutationResult.Builder builder, final HRegion region,
       final List<MutationProto> mutations, final CellScanner cells) {
     Mutation[] mArray = new Mutation[mutations.size()];
     long before = EnvironmentEdgeManager.currentTimeMillis();
     boolean batchContainsPuts = false, batchContainsDelete = false;
+    ResultOrException resultOrException = null;
     try {
-      ActionResult.Builder resultBuilder = ActionResult.newBuilder();
-      resultBuilder.setValue(ClientProtos.Result.newBuilder().build());
-      ActionResult result = resultBuilder.build();
       int i = 0;
       for (MutationProto m : mutations) {
         Mutation mutation;
@@ -4066,7 +4066,6 @@ public class HRegionServer implements Cl
           batchContainsDelete = true;
         }
         mArray[i++] = mutation;
-        builder.addResult(result);
       }
 
       requestCount.add(mutations.size());
@@ -4078,21 +4077,21 @@ public class HRegionServer implements Cl
       for (i = 0; i < codes.length; i++) {
         switch (codes[i].getOperationStatusCode()) {
           case BAD_FAMILY:
-            result = ResponseConverter.buildActionResult(
-                new NoSuchColumnFamilyException(codes[i].getExceptionMsg()));
-            builder.setResult(i, result);
+            resultOrException = ResponseConverter.buildActionResult(
+              new NoSuchColumnFamilyException(codes[i].getExceptionMsg()));
+            builder.setResultOrException(i, resultOrException);
             break;
 
           case SANITY_CHECK_FAILURE:
-            result = ResponseConverter.buildActionResult(
-                new FailedSanityCheckException(codes[i].getExceptionMsg()));
-            builder.setResult(i, result);
+            resultOrException = ResponseConverter.buildActionResult(
+              new FailedSanityCheckException(codes[i].getExceptionMsg()));
+            builder.setResultOrException(i, resultOrException);
             break;
 
           default:
-            result = ResponseConverter.buildActionResult(
-                new DoNotRetryIOException(codes[i].getExceptionMsg()));
-            builder.setResult(i, result);
+            resultOrException = ResponseConverter.buildActionResult(
+              new DoNotRetryIOException(codes[i].getExceptionMsg()));
+            builder.setResultOrException(i, resultOrException);
             break;
 
           case SUCCESS:
@@ -4100,9 +4099,9 @@ public class HRegionServer implements Cl
         }
       }
     } catch (IOException ie) {
-      ActionResult result = ResponseConverter.buildActionResult(ie);
+      resultOrException = ResponseConverter.buildActionResult(ie);
       for (int i = 0; i < mutations.size(); i++) {
-        builder.setResult(i, result);
+        builder.setResultOrException(i, resultOrException);
       }
     }
     long after = EnvironmentEdgeManager.currentTimeMillis();
@@ -4124,8 +4123,9 @@ public class HRegionServer implements Cl
    *         exceptionMessage if any
    * @throws IOException
    */
-  protected OperationStatus[] doBatchOp(final HRegion region,
-      final List<Pair<MutationType, Mutation>> mutations, boolean isReplay) throws
IOException {
+  protected OperationStatus [] doBatchOp(final HRegion region,
+      final List<Pair<MutationType, Mutation>> mutations, boolean isReplay)
+  throws IOException {
     Mutation[] mArray = new Mutation[mutations.size()];
     long before = EnvironmentEdgeManager.currentTimeMillis();
     boolean batchContainsPuts = false, batchContainsDelete = false;

Modified: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/QosFunction.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/QosFunction.java?rev=1527687&r1=1527686&r2=1527687&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/QosFunction.java
(original)
+++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/QosFunction.java
Mon Sep 30 17:59:10 2013
@@ -83,8 +83,7 @@ class QosFunction implements Function<Pa
       CompactRegionRequest.class,
       GetRequest.class,
       MutateRequest.class,
-      ScanRequest.class,
-      MultiRequest.class
+      ScanRequest.class
   };
 
   // Some caches for helping performance
@@ -101,7 +100,7 @@ class QosFunction implements Function<Pa
       if (p != null) {
         // Since we protobuf'd, and then subsequently, when we went with pb style, method
names
         // are capitalized.  This meant that this brittle compare of method names gotten
by
-        // reflection no longer matched the method names comeing in over pb.  TODO: Get rid
of this
+        // reflection no longer matched the method names coming in over pb.  TODO: Get rid
of this
         // check.  For now, workaround is to capitalize the names we got from reflection
so they
         // have chance of matching the pb ones.
         String capitalizedMethodName = capitalize(m.getName());
@@ -154,6 +153,11 @@ class QosFunction implements Function<Pa
     if (param == null) {
       return HConstants.NORMAL_QOS;
     }
+    if (methodName.equalsIgnoreCase("multi") && param instanceof MultiRequest) {
+      // The multi call has its priority set in the header.  All calls should work this way
but
+      // only this one has been converted so far.  No priority == NORMAL_QOS.
+      return header.hasPriority()? header.getPriority(): HConstants.NORMAL_QOS;
+    }
     String cls = param.getClass().getName();
     Class<? extends Message> rpcArgClass = argumentToClassMap.get(cls);
     RegionSpecifier regionSpecifier = null;
@@ -203,4 +207,4 @@ class QosFunction implements Function<Pa
   void setRegionServer(final HRegionServer hrs) {
     this.hRegionServer = hrs;
   }
-}
\ No newline at end of file
+}

Modified: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java?rev=1527687&r1=1527686&r2=1527687&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
(original)
+++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
Mon Sep 30 17:59:10 2013
@@ -33,16 +33,14 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Action;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.RegionServerCallable;
 import org.apache.hadoop.hbase.client.Row;
-import org.apache.hadoop.hbase.client.RpcRetryingCaller;
 import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -53,10 +51,10 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ActionResult;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse;
-import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionMutationResult;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Pair;
 
@@ -118,7 +116,7 @@ public class WALEditsReplaySink {
     HRegionLocation loc = null;
     HLog.Entry entry = null;
     List<HLog.Entry> regionEntries = null;
-    // Build the action list. 
+    // Build the action list.
     for (int i = 0; i < batchSize; i++) {
       loc = entries.get(i).getFirst();
       entry = entries.get(i).getSecond();
@@ -130,7 +128,7 @@ public class WALEditsReplaySink {
       }
       regionEntries.add(entry);
     }
-    
+
     long startTime = EnvironmentEdgeManager.currentTimeMillis();
 
     // replaying edits by region
@@ -143,7 +141,7 @@ public class WALEditsReplaySink {
       for (; replayedActions < totalActions;) {
         curBatchSize = (totalActions > (MAX_BATCH_SIZE + replayedActions)) ? MAX_BATCH_SIZE
                 : (totalActions - replayedActions);
-        replayEdits(loc, curRegion, allActions.subList(replayedActions, 
+        replayEdits(loc, curRegion, allActions.subList(replayedActions,
           replayedActions + curBatchSize));
         replayedActions += curBatchSize;
       }
@@ -185,7 +183,7 @@ public class WALEditsReplaySink {
       }
     }
   }
-  
+
   /**
    * Callable that handles the <code>replay</code> method call going against
a single regionserver
    * @param <R>
@@ -202,7 +200,7 @@ public class WALEditsReplaySink {
       this.regionInfo = regionInfo;
       setLocation(regionLoc);
     }
-    
+
     @Override
     public ReplicateWALEntryResponse call() throws IOException {
       try {

Modified: hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQosFunction.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQosFunction.java?rev=1527687&r1=1527686&r2=1527687&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQosFunction.java
(original)
+++ hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQosFunction.java
Mon Sep 30 17:59:10 2013
@@ -20,8 +20,9 @@ import static org.junit.Assert.assertEqu
 
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.SmallTests;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.mockito.Mockito;
@@ -43,13 +44,20 @@ public class TestQosFunction {
     checkMethod("ReplicateWALEntry", HConstants.REPLICATION_QOS, qosFunction);
     // Set method name in pb style with the method name capitalized.
     checkMethod("OpenRegion", HConstants.HIGH_QOS, qosFunction);
+    // Check multi works.
+    checkMethod("Multi", HConstants.NORMAL_QOS, qosFunction, MultiRequest.getDefaultInstance());
   }
 
   private void checkMethod(final String methodName, final int expected, final QosFunction
qosf) {
+    checkMethod(methodName, expected, qosf, null);
+  }
+
+  private void checkMethod(final String methodName, final int expected,
+      final QosFunction qosf, final Message param) {
     RequestHeader.Builder builder = RequestHeader.newBuilder();
     builder.setMethodName(methodName);
     Pair<RequestHeader, Message> headerAndParam =
-      new Pair<RequestHeader, Message>(builder.build(), null);
+      new Pair<RequestHeader, Message>(builder.build(), param);
     assertEquals(methodName, expected, qosf.apply(headerAndParam).intValue());
   }
-}
\ No newline at end of file
+}



Mime
View raw message