hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chenh...@apache.org
Subject hbase git commit: HBASE-16596 Reduce redundant interfaces in AsyncProcess
Date Sat, 10 Sep 2016 03:14:31 GMT
Repository: hbase
Updated Branches:
  refs/heads/master e1e063720 -> cc2a40a78


HBASE-16596 Reduce redundant interfaces in AsyncProcess


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/cc2a40a7
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/cc2a40a7
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/cc2a40a7

Branch: refs/heads/master
Commit: cc2a40a78f4e65ef38dad2cbc921613c4d15cbf7
Parents: e1e0637
Author: chenheng <chenheng@apache.org>
Authored: Sat Sep 10 11:13:28 2016 +0800
Committer: chenheng <chenheng@apache.org>
Committed: Sat Sep 10 11:13:28 2016 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/client/AsyncProcess.java       | 29 +---------
 .../org/apache/hadoop/hbase/client/HTable.java  |  2 +-
 .../hadoop/hbase/client/TestAsyncProcess.java   | 59 +++++++++-----------
 .../hadoop/hbase/client/TestClientPushback.java |  2 +-
 4 files changed, 31 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/cc2a40a7/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index 5bb0f58..c5745e9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -392,15 +392,7 @@ class AsyncProcess {
     }
     throw new RuntimeException("Neither AsyncProcess nor request have ExecutorService");
   }
-  /**
-   * See {@link #submit(ExecutorService, TableName, List, boolean, Batch.Callback, boolean)}.
-   * Uses default ExecutorService for this AP (must have been created with one).
-   */
-  public <CResult> AsyncRequestFuture submit(TableName tableName, final List<? extends
Row> rows,
-      boolean atLeastOne, Batch.Callback<CResult> callback, boolean needResults)
-      throws InterruptedIOException {
-    return submit(null, tableName, rows, atLeastOne, callback, needResults);
-  }
+
   /**
    * See {@link #submit(ExecutorService, TableName, RowAccess, boolean, Batch.Callback, boolean)}.
    * Uses default ExecutorService for this AP (must have been created with one).
@@ -529,7 +521,7 @@ class AsyncProcess {
       List<Integer> locationErrorRows, Map<ServerName, MultiAction<Row>>
actionsByServer,
       ExecutorService pool) {
     AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(
-      tableName, retainedActions, nonceGroup, pool, callback, results, needResults);
+      tableName, retainedActions, nonceGroup, pool, callback, results, needResults, null,
timeout);
     // Add location errors if any
     if (locationErrors != null) {
       for (int i = 0; i < locationErrors.size(); ++i) {
@@ -564,14 +556,6 @@ class AsyncProcess {
 
     multiAction.add(regionName, action);
   }
-  /**
-   * See {@link #submitAll(ExecutorService, TableName, List, Batch.Callback, Object[])}.
-   * Uses default ExecutorService for this AP (must have been created with one).
-   */
-  public <CResult> AsyncRequestFuture submitAll(TableName tableName,
-      List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results)
{
-    return submitAll(null, tableName, rows, callback, results, null, timeout);
-  }
 
   public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName,
       List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results)
{
@@ -1785,15 +1769,6 @@ class AsyncProcess {
         results, callback, callable, curTimeout);
   }
 
-  @VisibleForTesting
-  /** Create AsyncRequestFuture. Isolated to be easily overridden in the tests. */
-  protected <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture(
-      TableName tableName, List<Action<Row>> actions, long nonceGroup, ExecutorService
pool,
-      Batch.Callback<CResult> callback, Object[] results, boolean needResults) {
-    return createAsyncRequestFuture(
-        tableName, actions, nonceGroup, pool, callback, results, needResults, null, timeout);
-  }
-
   /**
    * Create a callable. Isolated to be easily overridden in the tests.
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/cc2a40a7/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index f8bbfc1..492714f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -1218,7 +1218,7 @@ public class HTable implements Table {
             RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()),
             true, RpcControllerFactory.instantiate(configuration), readRpcTimeout);
 
-    AsyncRequestFuture future = asyncProcess.submitAll(tableName, execs,
+    AsyncRequestFuture future = asyncProcess.submitAll(null, tableName, execs,
         new Callback<ClientProtos.CoprocessorServiceResult>() {
           @Override
           public void update(byte[] region, byte[] row,

http://git-wip-us.apache.org/repos/asf/hbase/blob/cc2a40a7/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index 656dcfc..e7366a9 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -159,10 +159,11 @@ public class TestAsyncProcess {
     @Override
     protected <Res> AsyncRequestFutureImpl<Res> createAsyncRequestFuture(TableName
tableName,
         List<Action<Row>> actions, long nonceGroup, ExecutorService pool,
-        Batch.Callback<Res> callback, Object[] results, boolean needResults) {
+        Batch.Callback<Res> callback, Object[] results, boolean needResults,
+        CancellableRegionServerCallable callable, int curTimeout) {
       // Test HTable has tableName of null, so pass DUMMY_TABLE
       AsyncRequestFutureImpl<Res> r = super.createAsyncRequestFuture(
-          DUMMY_TABLE, actions, nonceGroup, pool, callback, results, needResults);
+          DUMMY_TABLE, actions, nonceGroup, pool, callback, results, needResults, null, rpcTimeout);
       allReqs.add(r);
       return r;
     }
@@ -203,13 +204,7 @@ public class TestAsyncProcess {
       // We use results in tests to check things, so override to always save them.
       return super.submit(DUMMY_TABLE, rows, atLeastOne, callback, true);
     }
-    @Override
-    public <Res> AsyncRequestFuture submit(TableName tableName, List<? extends Row>
rows,
-        boolean atLeastOne, Callback<Res> callback, boolean needResults)
-            throws InterruptedIOException {
-      // We use results in tests to check things, so override to always save them.
-      return super.submit(DUMMY_TABLE, rows, atLeastOne, callback, true);
-    }
+
     @Override
     public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName,
       List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results,
@@ -671,7 +666,7 @@ public class TestAsyncProcess {
     List<Put> puts = new ArrayList<Put>();
     puts.add(createPut(1, true));
 
-    ap.submit(DUMMY_TABLE, puts, false, null, false);
+    ap.submit(null, DUMMY_TABLE, puts, false, null, false);
     Assert.assertTrue(puts.isEmpty());
   }
 
@@ -690,7 +685,7 @@ public class TestAsyncProcess {
     List<Put> puts = new ArrayList<Put>();
     puts.add(createPut(1, true));
 
-    final AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, cb, false);
+    final AsyncRequestFuture ars = ap.submit(null, DUMMY_TABLE, puts, false, cb, false);
     Assert.assertTrue(puts.isEmpty());
     ars.waitUntilDone();
     Assert.assertEquals(updateCalled.get(), 1);
@@ -707,11 +702,11 @@ public class TestAsyncProcess {
     for (int i = 0; i != ap.maxConcurrentTasksPerRegion; ++i) {
       ap.incTaskCounters(Arrays.asList(hri1.getRegionName()), sn);
     }
-    ap.submit(DUMMY_TABLE, puts, false, null, false);
+    ap.submit(null, DUMMY_TABLE, puts, false, null, false);
     Assert.assertEquals(puts.size(), 1);
 
     ap.decTaskCounters(Arrays.asList(hri1.getRegionName()), sn);
-    ap.submit(DUMMY_TABLE, puts, false, null, false);
+    ap.submit(null, DUMMY_TABLE, puts, false, null, false);
     Assert.assertEquals(0, puts.size());
   }
 
@@ -729,11 +724,11 @@ public class TestAsyncProcess {
     puts.add(createPut(1, true)); // <== this one will make it, the region is already
in
     puts.add(createPut(2, true)); // <== new region, but the rs is ok
 
-    ap.submit(DUMMY_TABLE, puts, false, null, false);
+    ap.submit(null, DUMMY_TABLE, puts, false, null, false);
     Assert.assertEquals(" puts=" + puts, 1, puts.size());
 
     ap.taskCounterPerServer.put(sn2, new AtomicInteger(ap.maxConcurrentTasksPerServer - 1));
-    ap.submit(DUMMY_TABLE, puts, false, null, false);
+    ap.submit(null, DUMMY_TABLE, puts, false, null, false);
     Assert.assertTrue(puts.isEmpty());
   }
 
@@ -745,7 +740,7 @@ public class TestAsyncProcess {
     Put p = createPut(1, false);
     puts.add(p);
 
-    AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, null, true);
+    AsyncRequestFuture ars = ap.submit(null, DUMMY_TABLE, puts, false, null, true);
     Assert.assertEquals(0, puts.size());
     ars.waitUntilDone();
     verifyResult(ars, false);
@@ -788,12 +783,12 @@ public class TestAsyncProcess {
     Put p = createPut(1, true);
     puts.add(p);
 
-    ap.submit(DUMMY_TABLE, puts, false, null, false);
+    ap.submit(null, DUMMY_TABLE, puts, false, null, false);
     Assert.assertFalse(puts.isEmpty());
 
     t.start();
 
-    ap.submit(DUMMY_TABLE, puts, true, null, false);
+    ap.submit(null, DUMMY_TABLE, puts, true, null, false);
     Assert.assertTrue(puts.isEmpty());
 
     checkPoint.set(true);
@@ -811,7 +806,7 @@ public class TestAsyncProcess {
     puts.add(createPut(1, true));
     puts.add(createPut(1, true));
 
-    AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, null, true);
+    AsyncRequestFuture ars = ap.submit(null, DUMMY_TABLE, puts, false, null, true);
     Assert.assertTrue(puts.isEmpty());
     ars.waitUntilDone();
     verifyResult(ars, false, true, true);
@@ -822,7 +817,7 @@ public class TestAsyncProcess {
     puts.add(createPut(1, true));
     // Wait for AP to be free. While ars might have the result, ap counters are decreased
later.
     ap.waitUntilDone();
-    ars = ap.submit(DUMMY_TABLE, puts, false, null, true);
+    ars = ap.submit(null, DUMMY_TABLE, puts, false, null, true);
     Assert.assertEquals(0, puts.size());
     ars.waitUntilDone();
     Assert.assertEquals(1, ap.callsCt.get());
@@ -838,7 +833,7 @@ public class TestAsyncProcess {
     puts.add(createPut(1, true));
     puts.add(createPut(1, true));
 
-    AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, null, true);
+    AsyncRequestFuture ars = ap.submit(null, DUMMY_TABLE, puts, false, null, true);
     ars.waitUntilDone();
     verifyResult(ars, false, true, true);
     Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
@@ -873,7 +868,7 @@ public class TestAsyncProcess {
       puts.add(createPut(2, true));
       puts.add(createPut(3, true));
     }
-    ap.submit(DUMMY_TABLE, puts, true, null, false);
+    ap.submit(null, DUMMY_TABLE, puts, true, null, false);
     ap.waitUntilDone();
     // More time to wait if there are incorrect task count.
     TimeUnit.SECONDS.sleep(1);
@@ -910,7 +905,7 @@ public class TestAsyncProcess {
     t.start();
 
     try {
-      ap.submit(DUMMY_TABLE, puts, false, null, false);
+      ap.submit(null, DUMMY_TABLE, puts, false, null, false);
       Assert.fail("We should have been interrupted.");
     } catch (InterruptedIOException expected) {
     }
@@ -929,7 +924,7 @@ public class TestAsyncProcess {
     t2.start();
 
     long start = System.currentTimeMillis();
-    ap.submit(DUMMY_TABLE, new ArrayList<Row>(), false, null, false);
+    ap.submit(null, DUMMY_TABLE, new ArrayList<Row>(), false, null, false);
     long end = System.currentTimeMillis();
 
     //Adds 100 to secure us against approximate timing.
@@ -1444,7 +1439,7 @@ public class TestAsyncProcess {
     // One region has no replica, so the main call succeeds for it.
     MyAsyncProcessWithReplicas ap = createReplicaAp(10, 1000, 0);
     List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3);
-    AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[3]);
+    AsyncRequestFuture ars = ap.submitAll(null,DUMMY_TABLE, rows, null, new Object[3]);
     verifyReplicaResult(ars, RR.TRUE, RR.TRUE, RR.FALSE);
     Assert.assertEquals(2, ap.getReplicaCallCount());
   }
@@ -1454,7 +1449,7 @@ public class TestAsyncProcess {
     // Main call succeeds before replica calls are kicked off.
     MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 10, 0);
     List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3);
-    AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[3]);
+    AsyncRequestFuture ars = ap.submitAll(null, DUMMY_TABLE, rows, null, new Object[3]);
     verifyReplicaResult(ars, RR.FALSE, RR.FALSE, RR.FALSE);
     Assert.assertEquals(0, ap.getReplicaCallCount());
   }
@@ -1464,7 +1459,7 @@ public class TestAsyncProcess {
     // Either main or replica can succeed.
     MyAsyncProcessWithReplicas ap = createReplicaAp(0, 0, 0);
     List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
-    AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]);
+    AsyncRequestFuture ars = ap.submitAll(null, DUMMY_TABLE, rows, null, new Object[2]);
     verifyReplicaResult(ars, RR.DONT_CARE, RR.DONT_CARE);
     long replicaCalls = ap.getReplicaCallCount();
     Assert.assertTrue(replicaCalls >= 0);
@@ -1479,7 +1474,7 @@ public class TestAsyncProcess {
     MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0);
     ap.setPrimaryCallDelay(sn2, 2000);
     List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
-    AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]);
+    AsyncRequestFuture ars = ap.submitAll(null ,DUMMY_TABLE, rows, null, new Object[2]);
     verifyReplicaResult(ars, RR.FALSE, RR.TRUE);
     Assert.assertEquals(1, ap.getReplicaCallCount());
   }
@@ -1492,7 +1487,7 @@ public class TestAsyncProcess {
     MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0, 0);
     ap.addFailures(hri1, hri2);
     List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
-    AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]);
+    AsyncRequestFuture ars = ap.submitAll(null, DUMMY_TABLE, rows, null, new Object[2]);
     verifyReplicaResult(ars, RR.FAILED, RR.FAILED);
     Assert.assertEquals(0, ap.getReplicaCallCount());
   }
@@ -1504,7 +1499,7 @@ public class TestAsyncProcess {
     MyAsyncProcessWithReplicas ap = createReplicaAp(0, 1000, 1000, 0);
     ap.addFailures(hri1, hri1r2, hri2);
     List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
-    AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]);
+    AsyncRequestFuture ars = ap.submitAll(null, DUMMY_TABLE, rows, null, new Object[2]);
     verifyReplicaResult(ars, RR.TRUE, RR.TRUE);
     Assert.assertEquals(2, ap.getReplicaCallCount());
   }
@@ -1516,7 +1511,7 @@ public class TestAsyncProcess {
     MyAsyncProcessWithReplicas ap = createReplicaAp(500, 1000, 0, 0);
     ap.addFailures(hri1, hri1r1, hri1r2, hri2r1);
     List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
-    AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]);
+    AsyncRequestFuture ars = ap.submitAll(null, DUMMY_TABLE, rows, null, new Object[2]);
     verifyReplicaResult(ars, RR.FAILED, RR.FALSE);
     // We should get 3 exceptions, for main + 2 replicas for DUMMY_BYTES_1
     Assert.assertEquals(3, ars.getErrors().getNumExceptions());
@@ -1646,7 +1641,7 @@ public class TestAsyncProcess {
     List<Put> puts = new ArrayList<Put>();
     puts.add(createPut(1, true));
 
-    ap.submit(DUMMY_TABLE, puts, false, null, false);
+    ap.submit(null, DUMMY_TABLE, puts, false, null, false);
     Assert.assertTrue(puts.isEmpty());
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/cc2a40a7/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
index baec37e..f4fd603 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
@@ -136,7 +136,7 @@ public class TestClientPushback {
     final AtomicLong endTime = new AtomicLong();
     long startTime = EnvironmentEdgeManager.currentTime();
 
-    ((HTable) table).mutator.ap.submit(tableName, ops, true, new Batch.Callback<Result>()
{
+    ((HTable) table).mutator.ap.submit(null, tableName, ops, true, new Batch.Callback<Result>()
{
       @Override
       public void update(byte[] region, byte[] row, Result result) {
         endTime.set(EnvironmentEdgeManager.currentTime());


Mime
View raw message