hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nkey...@apache.org
Subject svn commit: r1526828 - /hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
Date Fri, 27 Sep 2013 08:22:56 GMT
Author: nkeywal
Date: Fri Sep 27 08:22:56 2013
New Revision: 1526828

URL: http://svn.apache.org/r1526828
Log:
HBASE-9647 Add a test in TestAsyncProcess to check the number of threads created

Modified:
    hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java

Modified: hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java?rev=1526828&r1=1526827&r2=1526828&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
(original)
+++ hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
Fri Sep 27 08:22:56 2013
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.HRegionIn
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.MediumTests;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Threads;
 import org.junit.Assert;
 import org.junit.Test;
@@ -39,8 +40,10 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -56,6 +59,7 @@ public class TestAsyncProcess {
   private static final Configuration conf = new Configuration();
 
   private static ServerName sn = new ServerName("localhost:10,1254");
+  private static ServerName sn2 = new ServerName("localhost:140,12540");
   private static HRegionInfo hri1 = new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_1, DUMMY_BYTES_2);
   private static HRegionInfo hri2 =
       new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_2, HConstants.EMPTY_END_ROW);
@@ -66,15 +70,38 @@ public class TestAsyncProcess {
   private static Exception failure = new Exception("failure");
 
   static class MyAsyncProcess<Res> extends AsyncProcess<Res> {
+    final AtomicInteger nbMultiResponse = new AtomicInteger();
+    final AtomicInteger nbActions = new AtomicInteger();
+
+    static class CountingThreadFactory implements ThreadFactory {
+      final AtomicInteger nbThreads;
+      ThreadFactory realFactory =  Threads.newDaemonThreadFactory("test-TestAsyncProcess");
+      @Override
+      public Thread newThread(Runnable r) {
+        nbThreads.incrementAndGet();
+        return realFactory.newThread(r);
+      }
+
+      CountingThreadFactory(AtomicInteger nbThreads){
+        this.nbThreads = nbThreads;
+      }
+    }
+
     public MyAsyncProcess(HConnection hc, AsyncProcessCallback<Res> callback, Configuration
conf) {
-      super(hc, DUMMY_TABLE, new ThreadPoolExecutor(1, 10, 60, TimeUnit.SECONDS,
-        new SynchronousQueue<Runnable>(), Threads.newDaemonThreadFactory("test-TestAsyncProcess")),
+      this(hc, callback, conf, new AtomicInteger());
+    }
+
+      public MyAsyncProcess(HConnection hc, AsyncProcessCallback<Res> callback, Configuration
conf,
+                          AtomicInteger nbThreads) {
+      super(hc, DUMMY_TABLE, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
+        new SynchronousQueue<Runnable>(), new CountingThreadFactory(nbThreads)),
           callback, conf, new RpcRetryingCallerFactory(conf));
     }
 
     @Override
     protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row>
callable) {
-      final MultiResponse mr = createMultiResponse(callable.getLocation(), callable.getMulti());
+      final MultiResponse mr = createMultiResponse(callable.getLocation(), callable.getMulti(),
+          nbMultiResponse, nbActions);
       return new RpcRetryingCaller<MultiResponse>(conf) {
         @Override
         public MultiResponse callWithoutRetries( RetryingCallable<MultiResponse> callable)
@@ -86,10 +113,12 @@ public class TestAsyncProcess {
   }
 
   static MultiResponse createMultiResponse(final HRegionLocation loc,
-      final MultiAction<Row> multi) {
+      final MultiAction<Row> multi, AtomicInteger nbMultiResponse, AtomicInteger nbActions)
{
     final MultiResponse mr = new MultiResponse();
+    nbMultiResponse.incrementAndGet();
     for (Map.Entry<byte[], List<Action<Row>>> entry : multi.actions.entrySet())
{
       for (Action a : entry.getValue()) {
+        nbActions.incrementAndGet();
         if (Arrays.equals(FAILS, a.getAction().getRow())) {
           mr.add(loc.getRegionInfo().getRegionName(), a.getOriginalIndex(), failure);
         } else {
@@ -99,12 +128,12 @@ public class TestAsyncProcess {
     }
     return mr;
   }
-
   /**
    * Returns our async process.
    */
   static class MyConnectionImpl extends HConnectionManager.HConnectionImplementation {
     MyAsyncProcess<?> ap;
+    final AtomicInteger nbThreads = new AtomicInteger(0);
     final static Configuration c = new Configuration();
 
     static {
@@ -123,8 +152,8 @@ public class TestAsyncProcess {
     protected <R> AsyncProcess createAsyncProcess(TableName tableName,
                                                   ExecutorService pool,
                                                   AsyncProcess.AsyncProcessCallback<R>
callback,
-                                                  Configuration conf) {
-      ap = new MyAsyncProcess<R>(this, callback, conf);
+                                                  Configuration confn ) {
+      ap = new MyAsyncProcess<R>(this, callback, c, nbThreads);
       return ap;
     }
 
@@ -133,7 +162,29 @@ public class TestAsyncProcess {
                                         final byte[] row) {
       return loc1;
     }
+  }
+
+  /**
+   * Returns our async process.
+   */
+  static class MyConnectionImpl2 extends MyConnectionImpl {
+    List<HRegionLocation> hrl;
+    boolean usedRegions[];
+
+    protected MyConnectionImpl2(List<HRegionLocation> hrl) {
+      super(c);
+      this.hrl = hrl;
+      this.usedRegions = new boolean[hrl.size()];
+    }
 
+    @Override
+    public HRegionLocation locateRegion(final TableName tableName,
+                                        final byte[] row) {
+      Random rd = new Random(Bytes.toLong(row));
+      int pos = rd.nextInt(hrl.size());
+      usedRegions[pos] = true;
+      return hrl.get(pos);
+    }
   }
 
   @Test
@@ -613,6 +664,41 @@ public class TestAsyncProcess {
     Assert.assertTrue((System.currentTimeMillis() - start) < 10000);
   }
 
+  /**
+   * This test simulates multiple regions on 2 servers. We should have 2 multi requests and
+   *  2 threads: 1 per server, this whatever the number of regions.
+   */
+  @Test
+  public void testThreadCreation() throws Exception {
+    final int NB_REGS = 10000;
+    List<HRegionLocation> hrls = new ArrayList<HRegionLocation>(NB_REGS);
+    List<Get> gets = new ArrayList<Get>(NB_REGS);
+    for (int i = 0; i < NB_REGS; i++) {
+      HRegionInfo hri = new HRegionInfo(
+          DUMMY_TABLE, Bytes.toBytes(i * 10L), Bytes.toBytes(i * 10L + 9L));
+      HRegionLocation hrl = new HRegionLocation(hri, i % 2 == 0 ? sn : sn2);
+      hrls.add(hrl);
+
+      Get get = new Get(Bytes.toBytes(i * 10L + 5L));
+      gets.add(get);
+    }
+
+    HTable ht = new HTable();
+    MyConnectionImpl2 con = new MyConnectionImpl2(hrls);
+    ht.connection = con;
+    ht.batch((List) gets);
+
+    Assert.assertEquals(con.ap.nbActions.get(), NB_REGS);
+    Assert.assertEquals(con.ap.nbMultiResponse.get(), 2); // 1 multi response per server
+    Assert.assertEquals(con.nbThreads.get(), 2);  // 1 thread per server
+
+    int nbReg = 0;
+    for (int i =0; i<NB_REGS; i++){
+      if (con.usedRegions[i]) nbReg++;
+    }
+    Assert.assertTrue("nbReg=" + nbReg, nbReg > NB_REGS / 10);
+  }
+
 
   /**
    * @param reg1    if true, creates a put on region 1, region 2 otherwise



Mime
View raw message