hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chia7...@apache.org
Subject hbase git commit: HBASE-17778 Remove the testing code in the AsyncRequestFutureImpl
Date Thu, 16 Mar 2017 23:55:27 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1 d1ea718e4 -> d542b446b


HBASE-17778 Remove the testing code in the AsyncRequestFutureImpl


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d542b446
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d542b446
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d542b446

Branch: refs/heads/branch-1
Commit: d542b446b824315800b994cc3de7d8950a2ca671
Parents: d1ea718
Author: CHIA-PING TSAI <chia7712@gmail.com>
Authored: Mon Mar 13 19:28:57 2017 +0800
Committer: Chia-Ping Tsai <chia7712@gmail.com>
Committed: Fri Mar 17 07:52:45 2017 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/client/AsyncProcess.java       | 67 +++++---------------
 .../hadoop/hbase/client/TestAsyncProcess.java   | 59 +++++++++++++++--
 2 files changed, 69 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/d542b446/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index 5bd9a4f..73cafc1 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -404,7 +404,8 @@ class AsyncProcess {
    * @return pool if non null, otherwise returns this.pool if non null, otherwise throws
    *         RuntimeException
    */
-  private ExecutorService getPool(ExecutorService pool) {
+  @VisibleForTesting
+  ExecutorService getPool(ExecutorService pool) {
     if (pool != null) {
       return pool;
     }
@@ -551,7 +552,8 @@ class AsyncProcess {
       List<Integer> locationErrorRows, Map<ServerName, MultiAction<Row>>
actionsByServer,
       ExecutorService pool) {
     AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(
-      tableName, retainedActions, nonceGroup, pool, callback, results, needResults);
+      tableName, retainedActions, nonceGroup, pool, callback, results, needResults, null,
+        operationTimeout, rpcTimeout);
     // Add location errors if any
     if (locationErrors != null) {
       for (int i = 0; i < locationErrors.size(); ++i) {
@@ -759,13 +761,14 @@ class AsyncProcess {
      * Runnable (that can be submitted to thread pool) that submits MultiAction to a
      * single server. The server call is synchronous, therefore we do it on a thread pool.
      */
-    private final class SingleServerRequestRunnable implements Runnable {
+    @VisibleForTesting
+    class SingleServerRequestRunnable implements Runnable {
       private final MultiAction<Row> multiAction;
       private final int numAttempt;
       private final ServerName server;
       private final Set<PayloadCarryingServerCallable> callsInProgress;
-      private Long heapSize = null;
-      private SingleServerRequestRunnable(
+      @VisibleForTesting
+      SingleServerRequestRunnable(
           MultiAction<Row> multiAction, int numAttempt, ServerName server,
           Set<PayloadCarryingServerCallable> callsInProgress) {
         this.multiAction = multiAction;
@@ -774,24 +777,6 @@ class AsyncProcess {
         this.callsInProgress = callsInProgress;
       }
 
-      @VisibleForTesting
-      long heapSize() {
-        if (heapSize != null) {
-          return heapSize;
-        }
-        heapSize = 0L;
-        for (Map.Entry<byte[], List<Action<Row>>> e: this.multiAction.actions.entrySet())
{
-          List<Action<Row>> actions = e.getValue();
-          for (Action<Row> action: actions) {
-            Row row = action.getAction();
-            if (row instanceof Mutation) {
-              heapSize += ((Mutation) row).heapSize();
-            }
-          }
-        }
-        return heapSize;
-      }
-
       @Override
       public void run() {
         MultiResponse res;
@@ -874,7 +859,6 @@ class AsyncProcess {
     private PayloadCarryingServerCallable currentCallable;
     private int operationTimeout;
     private int rpcTimeout;
-    private final Map<ServerName, List<Long>> heapSizesByServer = new HashMap<>();
     private RetryingTimeTracker tracker;
 
     public AsyncRequestFutureImpl(TableName tableName, List<Action<Row>> actions,
long nonceGroup,
@@ -961,21 +945,13 @@ class AsyncProcess {
     public Set<PayloadCarryingServerCallable> getCallsInProgress() {
       return callsInProgress;
     }
+
     @VisibleForTesting
-    Map<ServerName, List<Long>> getRequestHeapSize() {
-      return heapSizesByServer;
+    SingleServerRequestRunnable createSingleServerRequest(MultiAction<Row> multiAction,
int numAttempt, ServerName server,
+      Set<PayloadCarryingServerCallable> callsInProgress) {
+      return new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress);
     }
 
-    private SingleServerRequestRunnable addSingleServerRequestHeapSize(ServerName server,
-        SingleServerRequestRunnable runnable) {
-      List<Long> heapCount = heapSizesByServer.get(server);
-      if (heapCount == null) {
-        heapCount = new LinkedList<>();
-        heapSizesByServer.put(server, heapCount);
-      }
-      heapCount.add(runnable.heapSize());
-      return runnable;
-    }
     /**
      * Group a list of actions per region servers, and send them.
      *
@@ -1148,8 +1124,7 @@ class AsyncProcess {
           connection.getConnectionMetrics().incrNormalRunners();
         }
         incTaskCounters(multiAction.getRegions(), server);
-        SingleServerRequestRunnable runnable = addSingleServerRequestHeapSize(server,
-          new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress));
+        SingleServerRequestRunnable runnable = createSingleServerRequest(multiAction, numAttempt,
server, callsInProgress);
         return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction", runnable));
       }
 
@@ -1172,8 +1147,7 @@ class AsyncProcess {
       for (DelayingRunner runner : actions.values()) {
         incTaskCounters(runner.getActions().getRegions(), server);
         String traceText = "AsyncProcess.sendMultiAction";
-        Runnable runnable = addSingleServerRequestHeapSize(server,
-          new SingleServerRequestRunnable(runner.getActions(), numAttempt, server, callsInProgress));
+        Runnable runnable = createSingleServerRequest(runner.getActions(), numAttempt, server,
callsInProgress);
         // use a delay runner only if we need to sleep for some time
         if (runner.getSleepTime() > 0) {
           runner.setRunner(runnable);
@@ -1829,7 +1803,8 @@ class AsyncProcess {
     }
   }
 
-  protected <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture(
+  @VisibleForTesting
+  <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture(
       TableName tableName, List<Action<Row>> actions, long nonceGroup, ExecutorService
pool,
       Batch.Callback<CResult> callback, Object[] results, boolean needResults,
       PayloadCarryingServerCallable callable, int operationTimeout, int rpcTimeout) {
@@ -1838,16 +1813,6 @@ class AsyncProcess {
         results, callback, callable, operationTimeout, rpcTimeout);
   }
 
-  @VisibleForTesting
-  /** Create AsyncRequestFuture. Isolated to be easily overridden in the tests. */
-  protected <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture(
-      TableName tableName, List<Action<Row>> actions, long nonceGroup, ExecutorService
pool,
-      Batch.Callback<CResult> callback, Object[] results, boolean needResults) {
-    return createAsyncRequestFuture(
-        tableName, actions, nonceGroup, pool, callback, results, needResults, null,
-        operationTimeout, rpcTimeout);
-  }
-
   /**
    * Create a caller. Isolated to be easily overridden in the tests.
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/d542b446/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index 027d362..8c0b7df 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -153,13 +153,16 @@ public class TestAsyncProcess {
     public AtomicInteger callsCt = new AtomicInteger();
     private static int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
     private long previousTimeout = -1;
+
     @Override
-    protected <Res> AsyncRequestFutureImpl<Res> createAsyncRequestFuture(TableName
tableName,
+    <Res> AsyncRequestFutureImpl<Res> createAsyncRequestFuture(TableName tableName,
         List<Action<Row>> actions, long nonceGroup, ExecutorService pool,
-        Batch.Callback<Res> callback, Object[] results, boolean needResults) {
+        Batch.Callback<Res> callback, Object[] results, boolean needResults,
+        PayloadCarryingServerCallable callable, int operationTimeout, int rpcTimeout) {
       // Test HTable has tableName of null, so pass DUMMY_TABLE
-      AsyncRequestFutureImpl<Res> r = super.createAsyncRequestFuture(
-          DUMMY_TABLE, actions, nonceGroup, pool, callback, results, needResults);
+      MyAsyncRequestFutureImpl<Res> r = new MyAsyncRequestFutureImpl(
+          DUMMY_TABLE, actions, nonceGroup, getPool(pool), needResults, results, callback,
callable,
+        operationTimeout, rpcTimeout);
       allReqs.add(r);
       callsCt.incrementAndGet();
       return r;
@@ -254,6 +257,50 @@ public class TestAsyncProcess {
         }
       };
     }
+
+    class MyAsyncRequestFutureImpl<Res> extends AsyncRequestFutureImpl<Res> {
+
+      private final Map<ServerName, List<Long>> heapSizesByServer = new HashMap<>();
+
+      MyAsyncRequestFutureImpl(TableName tableName, List<Action<Row>> actions,
long nonceGroup,
+              ExecutorService pool, boolean needResults, Object[] results,
+              Batch.Callback<Res> callback, PayloadCarryingServerCallable callable,
+              int operationTimeout, int rpcTimeout) {
+        super(tableName, actions, nonceGroup, pool, needResults, results, callback, callable,
operationTimeout, rpcTimeout);
+      }
+
+      Map<ServerName, List<Long>> getRequestHeapSize() {
+        return heapSizesByServer;
+      }
+
+      @Override
+      SingleServerRequestRunnable createSingleServerRequest(
+              MultiAction<Row> multiAction, int numAttempt, ServerName server,
+              Set<PayloadCarryingServerCallable> callsInProgress) {
+        SingleServerRequestRunnable rq = new SingleServerRequestRunnable(
+                multiAction, numAttempt, server, callsInProgress);
+        List<Long> heapCount = heapSizesByServer.get(server);
+        if (heapCount == null) {
+          heapCount = new ArrayList<>();
+          heapSizesByServer.put(server, heapCount);
+        }
+        heapCount.add(heapSizeOf(multiAction));
+        return rq;
+      }
+
+      long heapSizeOf(MultiAction<Row> multiAction) {
+        long sum = 0;
+        for (List<Action<Row>> actions : multiAction.actions.values()) {
+          for (Action action : actions) {
+            Row row = action.getAction();
+            if (row instanceof Mutation) {
+              sum += ((Mutation) row).heapSize();
+            }
+          }
+        }
+        return sum;
+      }
+    }
   }
 
   static class CallerWithFailure extends RpcRetryingCaller<MultiResponse>{
@@ -644,7 +691,7 @@ public class TestAsyncProcess {
         if (!(req instanceof AsyncRequestFutureImpl)) {
           continue;
         }
-        AsyncRequestFutureImpl ars = (AsyncRequestFutureImpl) req;
+        MyAsyncProcess.MyAsyncRequestFutureImpl ars = (MyAsyncProcess.MyAsyncRequestFutureImpl)
req;
         if (ars.getRequestHeapSize().containsKey(sn)) {
           ++actualSnReqCount;
         }
@@ -660,7 +707,7 @@ public class TestAsyncProcess {
         if (!(req instanceof AsyncRequestFutureImpl)) {
           continue;
         }
-        AsyncRequestFutureImpl ars = (AsyncRequestFutureImpl) req;
+        MyAsyncProcess.MyAsyncRequestFutureImpl ars = (MyAsyncProcess.MyAsyncRequestFutureImpl)
req;
         Map<ServerName, List<Long>> requestHeapSize = ars.getRequestHeapSize();
         for (Map.Entry<ServerName, List<Long>> entry : requestHeapSize.entrySet())
{
           long sum = 0;


Mime
View raw message