hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r1541703 - in /hbase/trunk: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ hbase-client/src/test/java/org/apache/hadoop/hbase/client/ hbase-server/src/test/java/org/ap...
Date Wed, 13 Nov 2013 20:50:00 GMT
Author: stack
Date: Wed Nov 13 20:49:59 2013
New Revision: 1541703

URL: http://svn.apache.org/r1541703
Log:
HBASE-9907 Rig to fake a cluster so can profile client behaviors

Modified:
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
    hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestProtobufUtil.java

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java?rev=1541703&r1=1541702&r2=1541703&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java Wed Nov 13 20:49:59 2013
@@ -641,8 +641,7 @@ class AsyncProcess<CResult> {
   private void logAndResubmit(List<Action<Row>> initialActions, HRegionLocation oldLocation,
                               List<Action<Row>> toReplay, int numAttempt, int failureCount,
                               Throwable throwable,
-                              HConnectionManager.ServerErrorTracker errorsByServer){
-
+                              HConnectionManager.ServerErrorTracker errorsByServer) {
     if (toReplay.isEmpty()) {
       // it's either a success or a last failure
       if (failureCount != 0) {
@@ -789,22 +788,22 @@ class AsyncProcess<CResult> {
     StringBuilder sb = new StringBuilder();
 
     sb.append("#").append(id).append(", table=").append(tableName).
-        append(", Attempt #").append(numAttempt).append("/").append(numTries).append(" ");
+        append(", attempt=").append(numAttempt).append("/").append(numTries).append(" ");
 
     if (failureCount > 0 || error != null){
-      sb.append("failed ").append(failureCount).append(" ops").append(", last exception was: ").
-          append(error == null ? "null" : error.getMessage());
-    }else {
+      sb.append("failed ").append(failureCount).append(" ops").append(", last exception: ").
+          append(error == null ? "null" : error);
+    } else {
       sb.append("SUCCEEDED");
     }
 
-    sb.append(" on server ").append(sn);
+    sb.append(" on ").append(sn);
 
-    sb.append(", tracking started at ").append(startTime);
+    sb.append(", tracking started ").append(startTime);
 
     if (willRetry) {
-      sb.append(" - retrying after sleeping for ").append(backOffTime).append(" ms").
-          append(", will replay ").append(replaySize).append(" ops.");
+      sb.append(", retrying after ").append(backOffTime).append(" ms").
+          append(", replay ").append(replaySize).append(" ops.");
     } else if (failureCount > 0) {
       sb.append(" - FAILED, NOT RETRYING ANYMORE");
     }

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java?rev=1541703&r1=1541702&r2=1541703&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java Wed Nov 13 20:49:59 2013
@@ -41,10 +41,12 @@ import org.apache.hadoop.hbase.protobuf.
 
 /**
  * A cluster connection.  Knows how to find the master, locate regions out on the cluster,
- * keeps a cache of locations and then knows how to re-calibrate after they move.
- * {@link HConnectionManager} manages instances of this class.   This is NOT a connection to a
- * particular server but to all servers in the cluster.  Individual connections are managed at a
- * lower level.
+ * keeps a cache of locations and then knows how to re-calibrate after they move.  You need one
+ * of these to talk to your HBase cluster. {@link HConnectionManager} manages instances of this
+ * class.  See it for how to get one of these.
+ * 
+ * <p>This is NOT a connection to a particular server but to ALL servers in the cluster.  Individual
+ * connections are managed at a lower level.
  *
  * <p>HConnections are used by {@link HTable} mostly but also by
  * {@link HBaseAdmin}, and {@link CatalogTracker}.  HConnection instances can be shared.  Sharing

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1541703&r1=1541702&r2=1541703&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java Wed Nov 13 20:49:59 2013
@@ -71,7 +71,10 @@ import com.google.protobuf.Service;
 import com.google.protobuf.ServiceException;
 
 /**
- * <p>Used to communicate with a single HBase table.
+ * <p>Used to communicate with a single HBase table.  An implementation of
+ * {@link HTableInterface}.  Instances of this class can be constructed directly but it is
+ * encouraged that users get instances via {@link HConnection} and {@link HConnectionManager}.
+ * See {@link HConnectionManager} class comment for an example.
  *
  * <p>This class is not thread safe for reads nor write.
  *
@@ -336,7 +339,7 @@ public class HTable implements HTableInt
         HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
 
     this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(configuration);
-    ap = new AsyncProcess<Object>(connection, tableName, pool, null, 
+    ap = new AsyncProcess<Object>(connection, tableName, pool, null,
         configuration, rpcCallerFactory);
 
     this.maxKeyValueSize = this.configuration.getInt(
@@ -1070,7 +1073,7 @@ public class HTable implements HTableInt
       throw new IOException(
           "Invalid arguments to incrementColumnValue", npe);
     }
-    
+
     RegionServerCallable<Long> callable =
       new RegionServerCallable<Long>(connection, getName(), row) {
         public Long call() throws IOException {
@@ -1525,7 +1528,7 @@ public class HTable implements HTableInt
 
   @Override
   public String toString() {
-    return tableName + ", " + connection;
+    return tableName + ";" + connection;
   }
 
   /**
@@ -1541,4 +1544,4 @@ public class HTable implements HTableInt
       t.close();
     }
   }
-}
\ No newline at end of file
+}

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java?rev=1541703&r1=1541702&r2=1541703&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java Wed Nov 13 20:49:59 2013
@@ -37,6 +37,7 @@ import java.util.Map;
 
 /**
  * Used to communicate with a single HBase table.
+ * Obtain an instance from an {@ink HConnection}.
  *
  * @since 0.21.0
  */

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java?rev=1541703&r1=1541702&r2=1541703&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java Wed Nov 13 20:49:59 2013
@@ -31,10 +31,11 @@ import org.apache.hadoop.hbase.ipc.Paylo
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
-import org.apache.hadoop.hbase.util.Pair;
 
 import com.google.protobuf.ServiceException;
 
@@ -65,20 +66,29 @@ class MultiServerCallable<R> extends Reg
     int countOfActions = this.multiAction.size();
     if (countOfActions <= 0) throw new DoNotRetryIOException("No Actions");
     MultiRequest.Builder multiRequestBuilder = MultiRequest.newBuilder();
+    RegionAction.Builder regionActionBuilder = RegionAction.newBuilder();
+    ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
+    MutationProto.Builder mutationBuilder = MutationProto.newBuilder();
     List<CellScannable> cells = null;
     // The multi object is a list of Actions by region.  Iterate by region.
     for (Map.Entry<byte[], List<Action<R>>> e: this.multiAction.actions.entrySet()) {
       final byte [] regionName = e.getKey();
       final List<Action<R>> actions = e.getValue();
-      RegionAction.Builder regionActionBuilder;
+      regionActionBuilder.clear();
+      regionActionBuilder.setRegion(RequestConverter.buildRegionSpecifier(
+        HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME, regionName) );
+
+
       if (this.cellBlock) {
         // Presize.  Presume at least a KV per Action.  There are likely more.
         if (cells == null) cells = new ArrayList<CellScannable>(countOfActions);
         // Send data in cellblocks. The call to buildNoDataMultiRequest will skip RowMutations.
         // They have already been handled above. Guess at count of cells
-        regionActionBuilder = RequestConverter.buildNoDataRegionAction(regionName, actions, cells);
+        regionActionBuilder = RequestConverter.buildNoDataRegionAction(regionName, actions, cells,
+          regionActionBuilder, actionBuilder, mutationBuilder);
       } else {
-        regionActionBuilder = RequestConverter.buildRegionAction(regionName, actions);
+        regionActionBuilder = RequestConverter.buildRegionAction(regionName, actions,
+          regionActionBuilder, actionBuilder, mutationBuilder);
       }
       multiRequestBuilder.addRegionAction(regionActionBuilder.build());
     }
@@ -118,4 +128,4 @@ class MultiServerCallable<R> extends Reg
     // Use the location we were given in the constructor rather than go look it up.
     setStub(getConnection().getClient(getLocation().getServerName()));
   }
-}
\ No newline at end of file
+}

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java?rev=1541703&r1=1541702&r2=1541703&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java Wed Nov 13 20:49:59 2013
@@ -988,8 +988,8 @@ public final class ProtobufUtil {
    * @param increment
    * @return the converted mutate
    */
-  public static MutationProto toMutation(final Increment increment) {
-    MutationProto.Builder builder = MutationProto.newBuilder();
+  public static MutationProto toMutation(final Increment increment,
+      final MutationProto.Builder builder) {
     builder.setRow(ZeroCopyLiteralByteString.wrap(increment.getRow()));
     builder.setMutateType(MutationType.INCREMENT);
     builder.setDurability(toDurability(increment.getDurability()));
@@ -1045,12 +1045,18 @@ public final class ProtobufUtil {
    */
   public static MutationProto toMutation(final MutationType type, final Mutation mutation)
   throws IOException {
-    MutationProto.Builder builder = getMutationBuilderAndSetCommonFields(type, mutation);
+    return toMutation(type, mutation, MutationProto.newBuilder());
+  }
+
+  public static MutationProto toMutation(final MutationType type, final Mutation mutation,
+      MutationProto.Builder builder)
+  throws IOException {
+    builder = getMutationBuilderAndSetCommonFields(type, mutation, builder);
     ColumnValue.Builder columnBuilder = ColumnValue.newBuilder();
     QualifierValue.Builder valueBuilder = QualifierValue.newBuilder();
     for (Map.Entry<byte[],List<Cell>> family: mutation.getFamilyCellMap().entrySet()) {
+      columnBuilder.clear();
       columnBuilder.setFamily(ZeroCopyLiteralByteString.wrap(family.getKey()));
-      columnBuilder.clearQualifierValue();
       for (Cell cell: family.getValue()) {
         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
         valueBuilder.setQualifier(ZeroCopyLiteralByteString.wrap(
@@ -1080,9 +1086,10 @@ public final class ProtobufUtil {
    * @return a protobuf'd Mutation
    * @throws IOException
    */
-  public static MutationProto toMutationNoData(final MutationType type, final Mutation mutation)
+  public static MutationProto toMutationNoData(final MutationType type, final Mutation mutation,
+      final MutationProto.Builder builder)
   throws IOException {
-    MutationProto.Builder builder = getMutationBuilderAndSetCommonFields(type, mutation);
+    getMutationBuilderAndSetCommonFields(type, mutation, builder);
     builder.setAssociatedCellCount(mutation.size());
     return builder.build();
   }
@@ -1095,8 +1102,7 @@ public final class ProtobufUtil {
    * @return A partly-filled out protobuf'd Mutation.
    */
   private static MutationProto.Builder getMutationBuilderAndSetCommonFields(final MutationType type,
-      final Mutation mutation) {
-    MutationProto.Builder builder = MutationProto.newBuilder();
+      final Mutation mutation, MutationProto.Builder builder) {
     builder.setRow(ZeroCopyLiteralByteString.wrap(mutation.getRow()));
     builder.setMutateType(type);
     builder.setDurability(toDurability(mutation.getDurability()));
@@ -2254,15 +2260,16 @@ public final class ProtobufUtil {
     // Doing this is going to kill us if we do it for all data passed.
     // St.Ack 20121205
     CellProtos.Cell.Builder kvbuilder = CellProtos.Cell.newBuilder();
-    kvbuilder.setRow(ByteString.copyFrom(kv.getRowArray(), kv.getRowOffset(),
+    kvbuilder.setRow(ZeroCopyLiteralByteString.wrap(kv.getRowArray(), kv.getRowOffset(),
       kv.getRowLength()));
-    kvbuilder.setFamily(ByteString.copyFrom(kv.getFamilyArray(),
+    kvbuilder.setFamily(ZeroCopyLiteralByteString.wrap(kv.getFamilyArray(),
       kv.getFamilyOffset(), kv.getFamilyLength()));
-    kvbuilder.setQualifier(ByteString.copyFrom(kv.getQualifierArray(),
+    kvbuilder.setQualifier(ZeroCopyLiteralByteString.wrap(kv.getQualifierArray(),
       kv.getQualifierOffset(), kv.getQualifierLength()));
     kvbuilder.setCellType(CellProtos.CellType.valueOf(kv.getTypeByte()));
     kvbuilder.setTimestamp(kv.getTimestamp());
-    kvbuilder.setValue(ByteString.copyFrom(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
+    kvbuilder.setValue(ZeroCopyLiteralByteString.wrap(kv.getValueArray(), kv.getValueOffset(),
+      kv.getValueLength()));
     return kvbuilder.build();
   }
 

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java?rev=1541703&r1=1541702&r2=1541703&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java Wed Nov 13 20:49:59 2013
@@ -218,7 +218,7 @@ public final class RequestConverter {
     builder.setRegion(region);
     Condition condition = buildCondition(
       row, family, qualifier, comparator, compareType);
-    builder.setMutation(ProtobufUtil.toMutation(MutationType.PUT, put));
+    builder.setMutation(ProtobufUtil.toMutation(MutationType.PUT, put, MutationProto.newBuilder()));
     builder.setCondition(condition);
     return builder.build();
   }
@@ -246,7 +246,8 @@ public final class RequestConverter {
     builder.setRegion(region);
     Condition condition = buildCondition(
       row, family, qualifier, comparator, compareType);
-    builder.setMutation(ProtobufUtil.toMutation(MutationType.DELETE, delete));
+    builder.setMutation(ProtobufUtil.toMutation(MutationType.DELETE, delete,
+      MutationProto.newBuilder()));
     builder.setCondition(condition);
     return builder.build();
   }
@@ -265,7 +266,7 @@ public final class RequestConverter {
     RegionSpecifier region = buildRegionSpecifier(
       RegionSpecifierType.REGION_NAME, regionName);
     builder.setRegion(region);
-    builder.setMutation(ProtobufUtil.toMutation(MutationType.PUT, put));
+    builder.setMutation(ProtobufUtil.toMutation(MutationType.PUT, put, MutationProto.newBuilder()));
     return builder.build();
   }
 
@@ -283,7 +284,8 @@ public final class RequestConverter {
     RegionSpecifier region = buildRegionSpecifier(
       RegionSpecifierType.REGION_NAME, regionName);
     builder.setRegion(region);
-    builder.setMutation(ProtobufUtil.toMutation(MutationType.APPEND, append));
+    builder.setMutation(ProtobufUtil.toMutation(MutationType.APPEND, append,
+      MutationProto.newBuilder()));
     return builder.build();
   }
 
@@ -300,7 +302,7 @@ public final class RequestConverter {
     RegionSpecifier region = buildRegionSpecifier(
       RegionSpecifierType.REGION_NAME, regionName);
     builder.setRegion(region);
-    builder.setMutation(ProtobufUtil.toMutation(increment));
+    builder.setMutation(ProtobufUtil.toMutation(increment, MutationProto.newBuilder()));
     return builder.build();
   }
 
@@ -318,7 +320,8 @@ public final class RequestConverter {
     RegionSpecifier region = buildRegionSpecifier(
       RegionSpecifierType.REGION_NAME, regionName);
     builder.setRegion(region);
-    builder.setMutation(ProtobufUtil.toMutation(MutationType.DELETE, delete));
+    builder.setMutation(ProtobufUtil.toMutation(MutationType.DELETE, delete,
+      MutationProto.newBuilder()));
     return builder.build();
   }
 
@@ -334,7 +337,10 @@ public final class RequestConverter {
   public static RegionAction.Builder buildRegionAction(final byte [] regionName,
       final RowMutations rowMutations)
   throws IOException {
-    RegionAction.Builder builder = getRegionActionBuilderWithRegion(regionName);
+    RegionAction.Builder builder =
+      getRegionActionBuilderWithRegion(RegionAction.newBuilder(), regionName);
+    ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
+    MutationProto.Builder mutationBuilder = MutationProto.newBuilder();
     for (Mutation mutation: rowMutations.getMutations()) {
       MutationType mutateType = null;
       if (mutation instanceof Put) {
@@ -345,8 +351,11 @@ public final class RequestConverter {
         throw new DoNotRetryIOException("RowMutations supports only put and delete, not " +
           mutation.getClass().getName());
       }
-      MutationProto mp = ProtobufUtil.toMutation(mutateType, mutation);
-      builder.addAction(ClientProtos.Action.newBuilder().setMutation(mp).build());
+      mutationBuilder.clear();
+      MutationProto mp = ProtobufUtil.toMutation(mutateType, mutation, mutationBuilder);
+      actionBuilder.clear();
+      actionBuilder.setMutation(mp);
+      builder.addAction(actionBuilder.build());
     }
     return builder;
   }
@@ -363,9 +372,11 @@ public final class RequestConverter {
    * @throws IOException
    */
   public static RegionAction.Builder buildNoDataRegionAction(final byte[] regionName,
-      final RowMutations rowMutations, final List<CellScannable> cells)
+      final RowMutations rowMutations, final List<CellScannable> cells,
+      final RegionAction.Builder regionActionBuilder,
+      final ClientProtos.Action.Builder actionBuilder,
+      final MutationProto.Builder mutationBuilder)
   throws IOException {
-    RegionAction.Builder builder = getRegionActionBuilderWithRegion(regionName);
     for (Mutation mutation: rowMutations.getMutations()) {
       MutationType type = null;
       if (mutation instanceof Put) {
@@ -376,18 +387,20 @@ public final class RequestConverter {
         throw new DoNotRetryIOException("RowMutations supports only put and delete, not " +
           mutation.getClass().getName());
       }
-      MutationProto mp = ProtobufUtil.toMutationNoData(type, mutation);
+      mutationBuilder.clear();
+      MutationProto mp = ProtobufUtil.toMutationNoData(type, mutation, mutationBuilder);
       cells.add(mutation);
-      builder.addAction(ClientProtos.Action.newBuilder().setMutation(mp).build());
+      actionBuilder.clear();
+      regionActionBuilder.addAction(actionBuilder.setMutation(mp).build());
     }
-    return builder;
+    return regionActionBuilder;
   }
 
-  private static RegionAction.Builder getRegionActionBuilderWithRegion(final byte [] regionName) {
-    RegionAction.Builder builder = RegionAction.newBuilder();
+  private static RegionAction.Builder getRegionActionBuilderWithRegion(
+      final RegionAction.Builder regionActionBuilder, final byte [] regionName) {
     RegionSpecifier region = buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName);
-    builder.setRegion(region);
-    return builder;
+    regionActionBuilder.setRegion(region);
+    return regionActionBuilder;
   }
 
   /**
@@ -484,36 +497,37 @@ public final class RequestConverter {
    * @throws IOException
    */
   public static <R> RegionAction.Builder buildRegionAction(final byte[] regionName,
-      final List<Action<R>> actions)
+      final List<Action<R>> actions, final RegionAction.Builder regionActionBuilder,
+      final ClientProtos.Action.Builder actionBuilder,
+      final MutationProto.Builder mutationBuilder)
   throws IOException {
-    RegionAction.Builder builder = getRegionActionBuilderWithRegion(regionName);
-    ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
     for (Action<R> action: actions) {
       Row row = action.getAction();
       actionBuilder.clear();
       actionBuilder.setIndex(action.getOriginalIndex());
+      mutationBuilder.clear();
       if (row instanceof Get) {
         Get g = (Get)row;
-        builder.addAction(actionBuilder.setGet(ProtobufUtil.toGet(g)));
+        regionActionBuilder.addAction(actionBuilder.setGet(ProtobufUtil.toGet(g)));
       } else if (row instanceof Put) {
-        builder.addAction(actionBuilder.
-          setMutation(ProtobufUtil.toMutation(MutationType.PUT, (Put)row)));
+        regionActionBuilder.addAction(actionBuilder.
+          setMutation(ProtobufUtil.toMutation(MutationType.PUT, (Put)row, mutationBuilder)));
       } else if (row instanceof Delete) {
-        builder.addAction(actionBuilder.
-          setMutation(ProtobufUtil.toMutation(MutationType.DELETE, (Delete)row)));
+        regionActionBuilder.addAction(actionBuilder.
+          setMutation(ProtobufUtil.toMutation(MutationType.DELETE, (Delete)row, mutationBuilder)));
       } else if (row instanceof Append) {
-        builder.addAction(actionBuilder.
-          setMutation(ProtobufUtil.toMutation(MutationType.APPEND, (Append)row)));
+        regionActionBuilder.addAction(actionBuilder.
+          setMutation(ProtobufUtil.toMutation(MutationType.APPEND, (Append)row, mutationBuilder)));
       } else if (row instanceof Increment) {
-        builder.addAction(actionBuilder.
-          setMutation(ProtobufUtil.toMutation((Increment)row)));
+        regionActionBuilder.addAction(actionBuilder.
+          setMutation(ProtobufUtil.toMutation((Increment)row, mutationBuilder)));
       } else if (row instanceof RowMutations) {
         throw new UnsupportedOperationException("No RowMutations in multi calls; use mutateRow");
       } else {
         throw new DoNotRetryIOException("Multi doesn't support " + row.getClass().getName());
       }
     }
-    return builder;
+    return regionActionBuilder;
   }
 
   /**
@@ -533,14 +547,18 @@ public final class RequestConverter {
    * @throws IOException
    */
   public static <R> RegionAction.Builder buildNoDataRegionAction(final byte[] regionName,
-      final List<Action<R>> actions, final List<CellScannable> cells)
+      final List<Action<R>> actions, final List<CellScannable> cells,
+      final RegionAction.Builder regionActionBuilder,
+      final ClientProtos.Action.Builder actionBuilder,
+      final MutationProto.Builder mutationBuilder)
   throws IOException {
-    RegionAction.Builder builder = getRegionActionBuilderWithRegion(regionName);
-    ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
+    RegionAction.Builder builder =
+      getRegionActionBuilderWithRegion(RegionAction.newBuilder(), regionName);
     for (Action<R> action: actions) {
       Row row = action.getAction();
       actionBuilder.clear();
       actionBuilder.setIndex(action.getOriginalIndex());
+      mutationBuilder.clear();
       if (row instanceof Get) {
         Get g = (Get)row;
         builder.addAction(actionBuilder.setGet(ProtobufUtil.toGet(g)));
@@ -548,7 +566,7 @@ public final class RequestConverter {
         Put p = (Put)row;
         cells.add(p);
         builder.addAction(actionBuilder.
-          setMutation(ProtobufUtil.toMutationNoData(MutationType.PUT, p)));
+          setMutation(ProtobufUtil.toMutationNoData(MutationType.PUT, p, mutationBuilder)));
       } else if (row instanceof Delete) {
         Delete d = (Delete)row;
         int size = d.size();
@@ -560,21 +578,21 @@ public final class RequestConverter {
         if (size > 0) {
           cells.add(d);
           builder.addAction(actionBuilder.
-            setMutation(ProtobufUtil.toMutationNoData(MutationType.DELETE, d)));
+            setMutation(ProtobufUtil.toMutationNoData(MutationType.DELETE, d, mutationBuilder)));
         } else {
           builder.addAction(actionBuilder.
-            setMutation(ProtobufUtil.toMutation(MutationType.DELETE, d)));
+            setMutation(ProtobufUtil.toMutation(MutationType.DELETE, d, mutationBuilder)));
         }
       } else if (row instanceof Append) {
         Append a = (Append)row;
         cells.add(a);
         builder.addAction(actionBuilder.
-          setMutation(ProtobufUtil.toMutationNoData(MutationType.APPEND, a)));
+          setMutation(ProtobufUtil.toMutationNoData(MutationType.APPEND, a, mutationBuilder)));
       } else if (row instanceof Increment) {
         Increment i = (Increment)row;
         cells.add(i);
         builder.addAction(actionBuilder.
-          setMutation(ProtobufUtil.toMutationNoData(MutationType.INCREMENT, i)));
+          setMutation(ProtobufUtil.toMutationNoData(MutationType.INCREMENT, i, mutationBuilder)));
       } else if (row instanceof RowMutations) {
         continue; // ignore RowMutations
       } else {

Modified: hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java?rev=1541703&r1=1541702&r2=1541703&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java (original)
+++ hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java Wed Nov 13 20:49:59 2013
@@ -22,41 +22,79 @@ import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.net.SocketTimeoutException;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.commons.lang.NotImplementedException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.RegionTooBusyException;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.SmallTests;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.protobuf.generated.CellProtos;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService.BlockingInterface;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResult;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.mockito.Mockito;
 
+import com.google.common.base.Stopwatch;
+import com.google.protobuf.ByteString;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
+import com.google.protobuf.ZeroCopyLiteralByteString;
 
 /**
  * Test client behavior w/o setting up a cluster.
  * Mock up cluster emissions.
  */
 @Category(SmallTests.class)
-public class TestClientNoCluster {
+public class TestClientNoCluster extends Configured implements Tool {
   private static final Log LOG = LogFactory.getLog(TestClientNoCluster.class);
   private Configuration conf;
+  public static final ServerName META_SERVERNAME =
+    new ServerName("meta.example.org", 60010, 12345);
 
   @Before
   public void setUp() throws Exception {
@@ -71,7 +109,7 @@ public class TestClientNoCluster {
    * Simple cluster registry inserted in place of our usual zookeeper based one.
    */
   static class SimpleRegistry implements Registry {
-    final ServerName META_HOST = new ServerName("10.10.10.10", 60010, 12345);
+    final ServerName META_HOST = META_SERVERNAME;
 
     @Override
     public void init(HConnection connection) {
@@ -301,4 +339,456 @@ public class TestClientNoCluster {
       return this.stub;
     }
   }
+
+  /**
+   * Fake many regionservers and many regions on a connection implementation.
+   */
+  static class ManyServersManyRegionsConnection
+  extends HConnectionManager.HConnectionImplementation {
+    // All access should be synchronized
+    final Map<ServerName, ClientService.BlockingInterface> serversByClient;
+
+    /**
+     * Map of faked-up rows of a 'meta table'.
+     */
+    final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta;
+    final AtomicLong sequenceids = new AtomicLong(0);
+    private final Configuration conf;
+
+    ManyServersManyRegionsConnection(Configuration conf, boolean managed,
+        ExecutorService pool, User user)
+    throws IOException {
+      super(conf, managed, pool, user);
+      int serverCount = conf.getInt("hbase.test.servers", 10);
+      this.serversByClient =
+        new HashMap<ServerName, ClientService.BlockingInterface>(serverCount);
+      this.meta = makeMeta(Bytes.toBytes(
+        conf.get("hbase.test.tablename", Bytes.toString(BIG_USER_TABLE))),
+        conf.getInt("hbase.test.regions", 100),
+        conf.getLong("hbase.test.namespace.span", 1000),
+        serverCount);
+      this.conf = conf;
+    }
+
+    @Override
+    public ClientService.BlockingInterface getClient(ServerName sn) throws IOException {
+      // if (!sn.toString().startsWith("meta")) LOG.info(sn);
+      ClientService.BlockingInterface stub = null;
+      synchronized (this.serversByClient) {
+        stub = this.serversByClient.get(sn);
+        if (stub == null) {
+          stub = new FakeServer(this.conf, meta, sequenceids);
+          this.serversByClient.put(sn, stub);
+        }
+      }
+      return stub;
+    }
+  }
+
+  static MultiResponse doMultiResponse(final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta,
+      final AtomicLong sequenceids, final MultiRequest request) {
+    // Make a response to match the request.  Act like there were no failures.
+    ClientProtos.MultiResponse.Builder builder = ClientProtos.MultiResponse.newBuilder();
+    // Per Region.
+    RegionActionResult.Builder regionActionResultBuilder =
+        RegionActionResult.newBuilder();
+    ResultOrException.Builder roeBuilder = ResultOrException.newBuilder();
+    for (RegionAction regionAction: request.getRegionActionList()) {
+      regionActionResultBuilder.clear();
+      // Per Action in a Region.
+      for (ClientProtos.Action action: regionAction.getActionList()) {
+        roeBuilder.clear();
+        // Return empty Result and proper index as result.
+        roeBuilder.setResult(ClientProtos.Result.getDefaultInstance());
+        roeBuilder.setIndex(action.getIndex());
+        regionActionResultBuilder.addResultOrException(roeBuilder.build());
+      }
+      builder.addRegionActionResult(regionActionResultBuilder.build());
+    }
+    return builder.build();
+  }
+
+  /**
+   * Fake 'server'.
+   * Implements the ClientService responding as though it were a 'server' (presumes a new
+   * ClientService.BlockingInterface made per server).
+   */
+  static class FakeServer implements ClientService.BlockingInterface {
+    private AtomicInteger multiInvocationsCount = new AtomicInteger(0);
+    private final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta;
+    private final AtomicLong sequenceids;
+    private final long multiPause;
+    private final int tooManyMultiRequests;
+
+    FakeServer(final Configuration c, final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta,
+        final AtomicLong sequenceids) {
+      this.meta = meta;
+      this.sequenceids = sequenceids;
+
+      // Pause to simulate the server taking time applying the edits.  This will drive up the
+      // number of threads used over in client.
+      this.multiPause = c.getLong("hbase.test.multi.pause.when.done", 0);
+      this.tooManyMultiRequests = c.getInt("hbase.test.multi.too.many", 3);
+    }
+
+    @Override
+    public GetResponse get(RpcController controller, GetRequest request)
+    throws ServiceException {
+      boolean metaRegion = isMetaRegion(request.getRegion().getValue().toByteArray(),
+        request.getRegion().getType());
+      if (!metaRegion) throw new UnsupportedOperationException();
+      return doMetaGetResponse(meta, request);
+    }
+
+    @Override
+    public MutateResponse mutate(RpcController controller,
+        MutateRequest request) throws ServiceException {
+      throw new NotImplementedException();
+    }
+
+    @Override
+    public ScanResponse scan(RpcController controller,
+        ScanRequest request) throws ServiceException {
+      // Presume it is a scan of meta for now. Not all scans provide a region spec expecting
+      // the server to keep reference by scannerid.  TODO.
+      return doMetaScanResponse(meta, sequenceids, request);
+    }
+
+    @Override
+    public BulkLoadHFileResponse bulkLoadHFile(
+        RpcController controller, BulkLoadHFileRequest request)
+        throws ServiceException {
+      throw new NotImplementedException();
+    }
+
+    @Override
+    public CoprocessorServiceResponse execService(
+        RpcController controller, CoprocessorServiceRequest request)
+        throws ServiceException {
+      throw new NotImplementedException();
+    }
+
+    @Override
+    public MultiResponse multi(RpcController controller, MultiRequest request)
+    throws ServiceException {
+      int concurrentInvocations = this.multiInvocationsCount.incrementAndGet();
+      try {
+        if (concurrentInvocations >= tooManyMultiRequests) {
+          throw new ServiceException(new RegionTooBusyException("concurrentInvocations=" +
+           concurrentInvocations));
+        }
+        Threads.sleep(multiPause);
+        return doMultiResponse(meta, sequenceids, request);
+      } finally {
+        this.multiInvocationsCount.decrementAndGet();
+      }
+    }
+  }
+
+  static ScanResponse doMetaScanResponse(final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta,
+      final AtomicLong sequenceids, final ScanRequest request) {
+    ScanResponse.Builder builder = ScanResponse.newBuilder();
+    int max = request.getNumberOfRows();
+    int count = 0;
+    Map<byte [], Pair<HRegionInfo, ServerName>> tail =
+      request.hasScan()? meta.tailMap(request.getScan().getStartRow().toByteArray()): meta;
+      ClientProtos.Result.Builder resultBuilder = ClientProtos.Result.newBuilder();
+    for (Map.Entry<byte [], Pair<HRegionInfo, ServerName>> e: tail.entrySet()) {
+      // Can be 0 on open of a scanner -- i.e. rpc to setup scannerid only.
+      if (max <= 0) break;
+      if (++count > max) break;
+      HRegionInfo hri = e.getValue().getFirst();
+      ByteString row = ZeroCopyLiteralByteString.wrap(hri.getRegionName());
+      resultBuilder.clear();
+      resultBuilder.addCell(getRegionInfo(row, hri));
+      resultBuilder.addCell(getServer(row, e.getValue().getSecond()));
+      resultBuilder.addCell(getStartCode(row));
+      builder.addResults(resultBuilder.build());
+      // Set more to false if we are on the last region in table.
+      if (hri.getEndKey().length <= 0) builder.setMoreResults(false);
+      else builder.setMoreResults(true);
+    }
+    // If no scannerid, set one.
+    builder.setScannerId(request.hasScannerId()?
+      request.getScannerId(): sequenceids.incrementAndGet());
+    return builder.build();
+  }
+
+  static GetResponse doMetaGetResponse(final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta,
+      final GetRequest request) {
+    ClientProtos.Result.Builder resultBuilder = ClientProtos.Result.newBuilder();
+    ByteString row = request.getGet().getRow();
+    Pair<HRegionInfo, ServerName> p = meta.get(row.toByteArray());
+    if (p == null) {
+      if (request.getGet().getClosestRowBefore()) {
+        byte [] bytes = row.toByteArray();
+        SortedMap<byte [], Pair<HRegionInfo, ServerName>> head =
+          bytes != null? meta.headMap(bytes): meta;
+        p = head == null? null: head.get(head.lastKey());
+      }
+    }
+    if (p != null) {
+      resultBuilder.addCell(getRegionInfo(row, p.getFirst()));
+      resultBuilder.addCell(getServer(row, p.getSecond()));
+    }
+    resultBuilder.addCell(getStartCode(row));
+    GetResponse.Builder builder = GetResponse.newBuilder();
+    builder.setResult(resultBuilder.build());
+    return builder.build();
+  }
+
+  /**
+   * @param name region name or encoded region name.
+   * @param type
+   * @return True if we are dealing with a hbase:meta region.
+   */
+  static boolean isMetaRegion(final byte [] name, final RegionSpecifierType type) {
+    switch (type) {
+    case REGION_NAME:
+      return Bytes.equals(HRegionInfo.FIRST_META_REGIONINFO.getRegionName(), name);
+    case ENCODED_REGION_NAME:
+      return Bytes.equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(), name);
+    default: throw new UnsupportedOperationException();
+    }
+  }
+
+  private final static ByteString CATALOG_FAMILY_BYTESTRING =
+      ZeroCopyLiteralByteString.wrap(HConstants.CATALOG_FAMILY);
+  private final static ByteString REGIONINFO_QUALIFIER_BYTESTRING =
+      ZeroCopyLiteralByteString.wrap(HConstants.REGIONINFO_QUALIFIER);
+  private final static ByteString SERVER_QUALIFIER_BYTESTRING =
+      ZeroCopyLiteralByteString.wrap(HConstants.SERVER_QUALIFIER);
+
+  static CellProtos.Cell.Builder getBaseCellBuilder(final ByteString row) {
+    CellProtos.Cell.Builder cellBuilder = CellProtos.Cell.newBuilder();
+    cellBuilder.setRow(row);
+    cellBuilder.setFamily(CATALOG_FAMILY_BYTESTRING);
+    cellBuilder.setTimestamp(System.currentTimeMillis());
+    return cellBuilder;
+  }
+
+  static CellProtos.Cell getRegionInfo(final ByteString row, final HRegionInfo hri) {
+    CellProtos.Cell.Builder cellBuilder = getBaseCellBuilder(row);
+    cellBuilder.setQualifier(REGIONINFO_QUALIFIER_BYTESTRING);
+    cellBuilder.setValue(ZeroCopyLiteralByteString.wrap(hri.toByteArray()));
+    return cellBuilder.build();
+  }
+
+  static CellProtos.Cell getServer(final ByteString row, final ServerName sn) {
+    CellProtos.Cell.Builder cellBuilder = getBaseCellBuilder(row);
+    cellBuilder.setQualifier(SERVER_QUALIFIER_BYTESTRING);
+    cellBuilder.setValue(ByteString.copyFromUtf8(sn.getHostAndPort()));
+    return cellBuilder.build();
+  }
+
+  static CellProtos.Cell getStartCode(final ByteString row) {
+    CellProtos.Cell.Builder cellBuilder = getBaseCellBuilder(row);
+    cellBuilder.setQualifier(ZeroCopyLiteralByteString.wrap(HConstants.STARTCODE_QUALIFIER));
+    // TODO:
+    cellBuilder.setValue(ZeroCopyLiteralByteString.wrap(Bytes.toBytes(META_SERVERNAME.getStartcode())));
+    return cellBuilder.build();
+  }
+
+  private static final byte [] BIG_USER_TABLE = Bytes.toBytes("t");
+
+  /**
+   * Format passed integer.  Zero-pad.
+   * Copied from hbase-server PE class and small amendment.  Make them share.
+   * @param number
+   * @return Returns zero-prefixed 10-byte wide decimal version of passed
+   * number (Does absolute in case number is negative).
+   */
+  private static byte [] format(final long number) {
+    byte [] b = new byte[10];
+    long d = number;
+    for (int i = b.length - 1; i >= 0; i--) {
+      b[i] = (byte)((d % 10) + '0');
+      d /= 10;
+    }
+    return b;
+  }
+
+  /**
+   * @param count
+   * @param namespaceSpan
+   * @return <code>count</code> regions
+   */
+  private static HRegionInfo [] makeHRegionInfos(final byte [] tableName, final int count,
+      final long namespaceSpan) {
+    byte [] startKey = HConstants.EMPTY_BYTE_ARRAY;
+    byte [] endKey = HConstants.EMPTY_BYTE_ARRAY;
+    long interval = namespaceSpan / count;
+    HRegionInfo [] hris = new HRegionInfo[count];
+    for (int i = 0; i < count; i++) {
+      if (i == 0) {
+        endKey = format(interval);
+      } else {
+        startKey = endKey;
+        if (i == count - 1) endKey = HConstants.EMPTY_BYTE_ARRAY;
+        else endKey = format((i + 1) * interval);
+      }
+      hris[i] = new HRegionInfo(TableName.valueOf(tableName), startKey, endKey);
+    }
+    return hris;
+  }
+
+  /**
+   * @param count
+   * @return Return <code>count</code> servernames.
+   */
+  private static ServerName [] makeServerNames(final int count) {
+    ServerName [] sns = new ServerName[count];
+    for (int i = 0; i < count; i++) {
+      sns[i] = new ServerName("" + i + ".example.org", 60010, i);
+    }
+    return sns;
+  }
+
+  /**
+   * Comparator for meta row keys.
+   */
+  private static class MetaRowsComparator implements Comparator<byte []> {
+    private final KeyValue.KVComparator delegate = new KeyValue.MetaComparator();
+    @Override
+    public int compare(byte[] left, byte[] right) {
+      return delegate.compareRows(left, 0, left.length, right, 0, right.length);
+    }
+  }
+
+  /**
+   * Create up a map that is keyed by meta row name and whose value is the HRegionInfo and
+   * ServerName to return for this row.
+   * @param hris
+   * @param serverNames
+   * @return Map with faked hbase:meta content in it.
+   */
+  static SortedMap<byte [], Pair<HRegionInfo, ServerName>> makeMeta(final byte [] tableName,
+      final int regionCount, final long namespaceSpan, final int serverCount) {
+    // I need a comparator for meta rows so we sort properly.
+    SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta =
+      new ConcurrentSkipListMap<byte[], Pair<HRegionInfo,ServerName>>(new MetaRowsComparator());
+    HRegionInfo [] hris = makeHRegionInfos(tableName, regionCount, namespaceSpan);
+    ServerName [] serverNames = makeServerNames(serverCount);
+    int per = regionCount / serverCount;
+    int count = 0;
+    for (HRegionInfo hri: hris) {
+      Pair<HRegionInfo, ServerName> p =
+        new Pair<HRegionInfo, ServerName>(hri, serverNames[count++ / per]);
+      meta.put(hri.getRegionName(), p);
+    }
+    return meta;
+  }
+
+  /**
+   * Code for each 'client' to run.
+   * @param c
+   * @param sharedConnection
+   * @throws IOException
+   */
+  static void cycle(final Configuration c, final HConnection sharedConnection) throws IOException {
+    HTableInterface table = sharedConnection.getTable(BIG_USER_TABLE);
+    table.setAutoFlushTo(false);
+    long namespaceSpan = c.getLong("hbase.test.namespace.span", 1000000);
+    long startTime = System.currentTimeMillis();
+    final int printInterval = 100000;
+    try {
+      Stopwatch stopWatch = new Stopwatch();
+      stopWatch.start();
+      for (int i = 0; i < namespaceSpan; i++) {
+        byte [] b = format(i);
+        Put p = new Put(b);
+        p.add(HConstants.CATALOG_FAMILY, b, b);
+        if (i % printInterval == 0) {
+          LOG.info("Put " + printInterval + "/" + stopWatch.elapsedMillis());
+          stopWatch.reset();
+          stopWatch.start();
+        }
+        table.put(p);
+      }
+      LOG.info("Finished a cycle putting " + namespaceSpan + " in " +
+          (System.currentTimeMillis() - startTime) + "ms");
+    } finally {
+      table.close();
+    }
+  }
+
+  @Override
+  public int run(String[] arg0) throws Exception {
+    int errCode = 0;
+    // TODO: Make command options.
+    // How many servers to fake.
+    final int servers = 1;
+    // How many regions to put on the faked servers.
+    final int regions = 100000;
+    // How many 'keys' in the faked regions.
+    final long namespaceSpan = 1000000;
+    // How long to take to pause after doing a put; make this long if you want to fake a struggling
+    // server.
+    final long multiPause = 0;
+    // Check args make basic sense.
+    if ((namespaceSpan < regions) || (regions < servers)) {
+      throw new IllegalArgumentException("namespaceSpan=" + namespaceSpan + " must be > regions=" +
+        regions + " which must be > servers=" + servers);
+    }
+
+    // Set my many servers and many regions faking connection in place.
+    getConf().set("hbase.client.connection.impl",
+      ManyServersManyRegionsConnection.class.getName());
+    // Use simple kv registry rather than zk
+    getConf().set("hbase.client.registry.impl", SimpleRegistry.class.getName());
+    // When to report fails.  Default is we report the 10th.  This means we'll see log everytime
+    // an exception is thrown -- usually RegionTooBusyException when we have more than
+    // hbase.test.multi.too.many requests outstanding at any time.
+    getConf().setInt("hbase.client.start.log.errors.counter", 0);
+ 
+    // Ugly but this is only way to pass in configs.into ManyServersManyRegionsConnection class.
+    getConf().setInt("hbase.test.regions", regions);
+    getConf().setLong("hbase.test.namespace.span", namespaceSpan);
+    getConf().setLong("hbase.test.servers", servers);
+    getConf().set("hbase.test.tablename", Bytes.toString(BIG_USER_TABLE));
+    getConf().setLong("hbase.test.multi.pause.when.done", multiPause);
+    // Let there be ten outstanding requests at a time before we throw RegionBusyException.
+    getConf().setInt("hbase.test.multi.too.many", 10);
+    final int clients = 20;
+
+    // Have them all share the same connection so they all share the same instance of
+    // ManyServersManyRegionsConnection so I can keep an eye on how many requests by server.
+    final ExecutorService pool = Executors.newCachedThreadPool(Threads.getNamedThreadFactory("p"));
+      // Executors.newFixedThreadPool(servers * 10, Threads.getNamedThreadFactory("p"));
+    // Share a connection so I can keep counts in the 'server' on concurrency.
+    final HConnection sharedConnection = HConnectionManager.createConnection(getConf()/*, pool*/);
+    try {
+      Thread [] ts = new Thread[clients];
+      for (int j = 0; j < ts.length; j++) {
+        ts[j] = new Thread("" + j) {
+          final Configuration c = getConf();
+
+          @Override
+          public void run() {
+            try {
+              cycle(c, sharedConnection);
+            } catch (IOException e) {
+              e.printStackTrace();
+            }
+          }
+        };
+        ts[j].start();
+      }
+      for (int j = 0; j < ts.length; j++) {
+        ts[j].join();
+      }
+    } finally {
+      sharedConnection.close();
+    }
+    return errCode;
+  }
+
+  /**
+   * Run a client instance against a faked up server.
+   * @param args TODO
+   * @throws Exception
+   */
+  public static void main(String[] args) throws Exception {
+    System.exit(ToolRunner.run(HBaseConfiguration.create(), new TestClientNoCluster(), args));
+  }
 }
\ No newline at end of file

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java?rev=1541703&r1=1541702&r2=1541703&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java Wed Nov 13 20:49:59 2013
@@ -59,6 +59,8 @@ import org.apache.hadoop.hbase.ipc.proto
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
@@ -318,7 +320,10 @@ public class TestIPC {
         List<CellScannable> cells = new ArrayList<CellScannable>();
         // Message param = RequestConverter.buildMultiRequest(HConstants.EMPTY_BYTE_ARRAY, rm);
         ClientProtos.RegionAction.Builder builder = RequestConverter.buildNoDataRegionAction(
-            HConstants.EMPTY_BYTE_ARRAY, rm, cells);
+          HConstants.EMPTY_BYTE_ARRAY, rm, cells,
+          RegionAction.newBuilder(),
+          ClientProtos.Action.newBuilder(),
+          MutationProto.newBuilder());
         CellScanner cellScanner = CellUtil.createCellScanner(cells);
         if (i % 1000 == 0) {
           LOG.info("" + i);

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestProtobufUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestProtobufUtil.java?rev=1541703&r1=1541702&r2=1541703&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestProtobufUtil.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestProtobufUtil.java Wed Nov 13 20:49:59 2013
@@ -219,7 +219,8 @@ public class TestProtobufUtil {
     mutateBuilder.setDurability(MutationProto.Durability.USE_DEFAULT);
 
     Increment increment = ProtobufUtil.toIncrement(proto, null);
-    assertEquals(mutateBuilder.build(), ProtobufUtil.toMutation(increment));
+    assertEquals(mutateBuilder.build(),
+      ProtobufUtil.toMutation(increment, MutationProto.newBuilder()));
   }
 
   /**



Mime
View raw message