hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhang...@apache.org
Subject [1/3] hbase git commit: HBASE-17174 Refactor the AsyncProcess, BufferedMutatorImpl, and HTable
Date Sat, 24 Dec 2016 04:10:45 GMT
Repository: hbase
Updated Branches:
  refs/heads/master 992e5717d -> 8cb55c408


http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSimpleRequestController.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSimpleRequestController.java
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSimpleRequestController.java
new file mode 100644
index 0000000..b46e572
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSimpleRequestController.java
@@ -0,0 +1,336 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RequestController.ReturnCode;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Assert;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ClientTests.class, SmallTests.class})
+public class TestSimpleRequestController {
+
+  private static final TableName DUMMY_TABLE
+          = TableName.valueOf("DUMMY_TABLE");
+  private static final byte[] DUMMY_BYTES_1 = "DUMMY_BYTES_1".getBytes();
+  private static final byte[] DUMMY_BYTES_2 = "DUMMY_BYTES_2".getBytes();
+  private static final byte[] DUMMY_BYTES_3 = "DUMMY_BYTES_3".getBytes();
+  private static final ServerName SN = ServerName.valueOf("s1:1,1");
+  private static final ServerName SN2 = ServerName.valueOf("s2:2,2");
+  private static final ServerName SN3 = ServerName.valueOf("s3:3,3");
+  private static final HRegionInfo HRI1
+          = new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_1, DUMMY_BYTES_2, false, 1);
+  private static final HRegionInfo HRI2
+          = new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_2, HConstants.EMPTY_END_ROW, false,
2);
+  private static final HRegionInfo HRI3
+          = new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_3, HConstants.EMPTY_END_ROW, false,
3);
+  private static final HRegionLocation LOC1 = new HRegionLocation(HRI1, SN);
+  private static final HRegionLocation LOC2 = new HRegionLocation(HRI2, SN);
+  private static final HRegionLocation LOC3 = new HRegionLocation(HRI3, SN2);
+
+  @Test
+  public void testIllegalRequestSize() {
+    testIllegalArgument(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, -1);
+  }
+
+  @Test
+  public void testIllegalRsTasks() {
+    testIllegalArgument(HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS, -1);
+  }
+
+  @Test
+  public void testIllegalRegionTasks() {
+    testIllegalArgument(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS, -1);
+  }
+
+  @Test
+  public void testIllegalSubmittedSize() {
+    testIllegalArgument(SimpleRequestController.HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE, -1);
+  }
+
+  private void testIllegalArgument(String key, long value) {
+    Configuration conf = HBaseConfiguration.create();
+    conf.setLong(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, -1);
+    try {
+      SimpleRequestController controller = new SimpleRequestController(conf);
+      fail("The " + key + " must be bigger than zero");
+    } catch (IllegalArgumentException e) {
+    }
+  }
+
+  private static Put createPut(long maxHeapSizePerRequest) {
+    return new Put(Bytes.toBytes("row")) {
+      @Override
+      public long heapSize() {
+        return maxHeapSizePerRequest;
+      }
+    };
+  }
+
+  @Test
+  public void testTaskCheckerHost() throws IOException {
+    final int maxTotalConcurrentTasks = 100;
+    final int maxConcurrentTasksPerServer = 2;
+    final int maxConcurrentTasksPerRegion = 1;
+    final AtomicLong tasksInProgress = new AtomicLong(0);
+    final Map<ServerName, AtomicInteger> taskCounterPerServer = new HashMap<>();
+    final Map<byte[], AtomicInteger> taskCounterPerRegion = new HashMap<>();
+    SimpleRequestController.TaskCountChecker countChecker = new SimpleRequestController.TaskCountChecker(
+            maxTotalConcurrentTasks,
+            maxConcurrentTasksPerServer,
+            maxConcurrentTasksPerRegion,
+            tasksInProgress, taskCounterPerServer, taskCounterPerRegion);
+    final long maxHeapSizePerRequest = 2 * 1024 * 1024;
+    // unlimiited
+    SimpleRequestController.RequestSizeChecker sizeChecker = new SimpleRequestController.RequestSizeChecker(maxHeapSizePerRequest);
+    RequestController.Checker checker = SimpleRequestController.newChecker(Arrays.asList(countChecker,
sizeChecker));
+    ReturnCode loc1Code = checker.canTakeRow(LOC1, createPut(maxHeapSizePerRequest));
+    assertEquals(ReturnCode.INCLUDE, loc1Code);
+
+    ReturnCode loc1Code_2 = checker.canTakeRow(LOC1, createPut(maxHeapSizePerRequest));
+    // rejected for size
+    assertNotEquals(ReturnCode.INCLUDE, loc1Code_2);
+
+    ReturnCode loc2Code = checker.canTakeRow(LOC2, createPut(maxHeapSizePerRequest));
+    // rejected for size
+    assertNotEquals(ReturnCode.INCLUDE, loc2Code);
+
+    // fill the task slots for LOC3.
+    taskCounterPerRegion.put(LOC3.getRegionInfo().getRegionName(), new AtomicInteger(100));
+    taskCounterPerServer.put(LOC3.getServerName(), new AtomicInteger(100));
+
+    ReturnCode loc3Code = checker.canTakeRow(LOC3, createPut(1L));
+    // rejected for count
+    assertNotEquals(ReturnCode.INCLUDE, loc3Code);
+
+    // release the task slots for LOC3.
+    taskCounterPerRegion.put(LOC3.getRegionInfo().getRegionName(), new AtomicInteger(0));
+    taskCounterPerServer.put(LOC3.getServerName(), new AtomicInteger(0));
+
+    ReturnCode loc3Code_2 = checker.canTakeRow(LOC3, createPut(1L));
+    assertEquals(ReturnCode.INCLUDE, loc3Code_2);
+  }
+
+  @Test
+  public void testRequestSizeCheckerr() throws IOException {
+    final long maxHeapSizePerRequest = 2 * 1024 * 1024;
+    SimpleRequestController.RequestSizeChecker checker
+            = new SimpleRequestController.RequestSizeChecker(maxHeapSizePerRequest);
+
+    // inner state is unchanged.
+    for (int i = 0; i != 10; ++i) {
+      ReturnCode code = checker.canTakeOperation(LOC1, maxHeapSizePerRequest);
+      assertEquals(ReturnCode.INCLUDE, code);
+      code = checker.canTakeOperation(LOC2, maxHeapSizePerRequest);
+      assertEquals(ReturnCode.INCLUDE, code);
+    }
+
+    // accept the data located on LOC1 region.
+    ReturnCode acceptCode = checker.canTakeOperation(LOC1, maxHeapSizePerRequest);
+    assertEquals(ReturnCode.INCLUDE, acceptCode);
+    checker.notifyFinal(acceptCode, LOC1, maxHeapSizePerRequest);
+
+    // the sn server reachs the limit.
+    for (int i = 0; i != 10; ++i) {
+      ReturnCode code = checker.canTakeOperation(LOC1, maxHeapSizePerRequest);
+      assertNotEquals(ReturnCode.INCLUDE, code);
+      code = checker.canTakeOperation(LOC2, maxHeapSizePerRequest);
+      assertNotEquals(ReturnCode.INCLUDE, code);
+    }
+
+    // the request to sn2 server should be accepted.
+    for (int i = 0; i != 10; ++i) {
+      ReturnCode code = checker.canTakeOperation(LOC3, maxHeapSizePerRequest);
+      assertEquals(ReturnCode.INCLUDE, code);
+    }
+
+    checker.reset();
+    for (int i = 0; i != 10; ++i) {
+      ReturnCode code = checker.canTakeOperation(LOC1, maxHeapSizePerRequest);
+      assertEquals(ReturnCode.INCLUDE, code);
+      code = checker.canTakeOperation(LOC2, maxHeapSizePerRequest);
+      assertEquals(ReturnCode.INCLUDE, code);
+    }
+  }
+
+  @Test
+  public void testSubmittedSizeChecker() {
+    final long maxHeapSizeSubmit = 2 * 1024 * 1024;
+    SimpleRequestController.SubmittedSizeChecker checker
+            = new SimpleRequestController.SubmittedSizeChecker(maxHeapSizeSubmit);
+
+    for (int i = 0; i != 10; ++i) {
+      ReturnCode include = checker.canTakeOperation(LOC1, 100000);
+      assertEquals(ReturnCode.INCLUDE, include);
+    }
+
+    for (int i = 0; i != 10; ++i) {
+      checker.notifyFinal(ReturnCode.INCLUDE, LOC1, maxHeapSizeSubmit);
+    }
+
+    for (int i = 0; i != 10; ++i) {
+      ReturnCode include = checker.canTakeOperation(LOC1, 100000);
+      assertEquals(ReturnCode.END, include);
+    }
+    for (int i = 0; i != 10; ++i) {
+      ReturnCode include = checker.canTakeOperation(LOC2, 100000);
+      assertEquals(ReturnCode.END, include);
+    }
+    checker.reset();
+    for (int i = 0; i != 10; ++i) {
+      ReturnCode include = checker.canTakeOperation(LOC1, 100000);
+      assertEquals(ReturnCode.INCLUDE, include);
+    }
+  }
+
+  @Test
+  public void testTaskCountChecker() throws InterruptedIOException {
+    long rowSize = 12345;
+    int maxTotalConcurrentTasks = 100;
+    int maxConcurrentTasksPerServer = 2;
+    int maxConcurrentTasksPerRegion = 1;
+    AtomicLong tasksInProgress = new AtomicLong(0);
+    Map<ServerName, AtomicInteger> taskCounterPerServer = new HashMap<>();
+    Map<byte[], AtomicInteger> taskCounterPerRegion = new HashMap<>();
+    SimpleRequestController.TaskCountChecker checker = new SimpleRequestController.TaskCountChecker(
+            maxTotalConcurrentTasks,
+            maxConcurrentTasksPerServer,
+            maxConcurrentTasksPerRegion,
+            tasksInProgress, taskCounterPerServer, taskCounterPerRegion);
+
+    // inner state is unchanged.
+    for (int i = 0; i != 10; ++i) {
+      ReturnCode code = checker.canTakeOperation(LOC1, rowSize);
+      assertEquals(ReturnCode.INCLUDE, code);
+    }
+    // add LOC1 region.
+    ReturnCode code = checker.canTakeOperation(LOC1, rowSize);
+    assertEquals(ReturnCode.INCLUDE, code);
+    checker.notifyFinal(code, LOC1, rowSize);
+
+    // fill the task slots for LOC1.
+    taskCounterPerRegion.put(LOC1.getRegionInfo().getRegionName(), new AtomicInteger(100));
+    taskCounterPerServer.put(LOC1.getServerName(), new AtomicInteger(100));
+
+    // the region was previously accepted, so it must be accpted now.
+    for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) {
+      ReturnCode includeCode = checker.canTakeOperation(LOC1, rowSize);
+      assertEquals(ReturnCode.INCLUDE, includeCode);
+      checker.notifyFinal(includeCode, LOC1, rowSize);
+    }
+
+    // fill the task slots for LOC3.
+    taskCounterPerRegion.put(LOC3.getRegionInfo().getRegionName(), new AtomicInteger(100));
+    taskCounterPerServer.put(LOC3.getServerName(), new AtomicInteger(100));
+
+    // no task slots.
+    for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) {
+      ReturnCode excludeCode = checker.canTakeOperation(LOC3, rowSize);
+      assertNotEquals(ReturnCode.INCLUDE, excludeCode);
+      checker.notifyFinal(excludeCode, LOC3, rowSize);
+    }
+
+    // release the tasks for LOC3.
+    taskCounterPerRegion.put(LOC3.getRegionInfo().getRegionName(), new AtomicInteger(0));
+    taskCounterPerServer.put(LOC3.getServerName(), new AtomicInteger(0));
+
+    // add LOC3 region.
+    ReturnCode code3 = checker.canTakeOperation(LOC3, rowSize);
+    assertEquals(ReturnCode.INCLUDE, code3);
+    checker.notifyFinal(code3, LOC3, rowSize);
+
+    // the region was previously accepted, so it must be accpted now.
+    for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) {
+      ReturnCode includeCode = checker.canTakeOperation(LOC3, rowSize);
+      assertEquals(ReturnCode.INCLUDE, includeCode);
+      checker.notifyFinal(includeCode, LOC3, rowSize);
+    }
+
+    checker.reset();
+    // the region was previously accepted,
+    // but checker have reseted and task slots for LOC1 is full.
+    // So it must be rejected now.
+    for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) {
+      ReturnCode includeCode = checker.canTakeOperation(LOC1, rowSize);
+      assertNotEquals(ReturnCode.INCLUDE, includeCode);
+      checker.notifyFinal(includeCode, LOC1, rowSize);
+    }
+  }
+
+  @Test
+  public void testWaitForMaximumCurrentTasks() throws Exception {
+    final AtomicInteger max = new AtomicInteger(0);
+    final CyclicBarrier barrier = new CyclicBarrier(2);
+    SimpleRequestController controller = new SimpleRequestController(HBaseConfiguration.create());
+    final AtomicLong tasks = controller.tasksInProgress;
+    Runnable runnable = () -> {
+      try {
+        barrier.await();
+        controller.waitForMaximumCurrentTasks(max.get(), 123, 1, null);
+      } catch (InterruptedIOException e) {
+        Assert.fail(e.getMessage());
+      } catch (InterruptedException e) {
+        // TODO Auto-generated catch block
+        e.printStackTrace();
+      } catch (BrokenBarrierException e) {
+        // TODO Auto-generated catch block
+        e.printStackTrace();
+      }
+    };
+    // First test that our runnable thread only exits when tasks is zero.
+    Thread t = new Thread(runnable);
+    t.start();
+    barrier.await();
+    t.join();
+    // Now assert we stay running if max == zero and tasks is > 0.
+    barrier.reset();
+    tasks.set(1000000);
+    t = new Thread(runnable);
+    t.start();
+    barrier.await();
+    while (tasks.get() > 0) {
+      assertTrue(t.isAlive());
+      tasks.set(tasks.get() - 1);
+    }
+    t.join();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
index ee89609..e5ab3e8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
@@ -126,11 +126,8 @@ public class HConnectionTestingUtility {
     NonceGenerator ng = Mockito.mock(NonceGenerator.class);
     Mockito.when(c.getNonceGenerator()).thenReturn(ng);
     Mockito.when(c.getAsyncProcess()).thenReturn(
-      new AsyncProcess(c, conf, null, RpcRetryingCallerFactory.instantiate(conf), false,
-          RpcControllerFactory.instantiate(conf), conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
-              HConstants.DEFAULT_HBASE_RPC_TIMEOUT), conf.getInt(
-                  HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
-          HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT)));
+      new AsyncProcess(c, conf, RpcRetryingCallerFactory.instantiate(conf), false,
+          RpcControllerFactory.instantiate(conf)));
     Mockito.when(c.getNewRpcRetryingCallerFactory(conf)).thenReturn(
         RpcRetryingCallerFactory.instantiate(conf,
             RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null));

http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
index 53488ec..2c5e89d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
@@ -28,6 +28,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.client.AsyncProcessTask;
 import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
 import org.apache.hadoop.hbase.client.backoff.ExponentialClientBackoffPolicy;
 import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
@@ -137,14 +138,20 @@ public class TestClientPushback {
     final CountDownLatch latch = new CountDownLatch(1);
     final AtomicLong endTime = new AtomicLong();
     long startTime = EnvironmentEdgeManager.currentTime();
-
-    ((HTable) table).mutator.ap.submit(null, tableName, ops, true, new Batch.Callback<Result>()
{
-      @Override
-      public void update(byte[] region, byte[] row, Result result) {
+    BufferedMutatorImpl mutator = ((HTable) table).mutator;
+    Batch.Callback<Result> callback = (byte[] r, byte[] row, Result result) -> {
         endTime.set(EnvironmentEdgeManager.currentTime());
         latch.countDown();
-      }
-    }, true);
+    };
+    AsyncProcessTask<Result> task = AsyncProcessTask.newBuilder(callback)
+            .setPool(mutator.getPool())
+            .setTableName(tableName)
+            .setRowAccess(ops)
+            .setSubmittedRows(AsyncProcessTask.SubmittedRows.AT_LEAST_ONE)
+            .setOperationTimeout(conn.getConnectionConfiguration().getOperationTimeout())
+            .setRpcTimeout(60 * 1000)
+            .build();
+    mutator.getAsyncProcess().submit(task);
     // Currently the ExponentialClientBackoffPolicy under these test conditions
     // produces a backoffTime of 151 milliseconds. This is long enough so the
     // wait and related checks below are reasonable. Revisit if the backoff

http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
index 6d1e1f0..0f7f3d9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
@@ -563,9 +563,17 @@ public class TestReplicasClient {
       gets.add(g);
       Object[] results = new Object[2];
 
-      AsyncRequestFuture reqs = ap.submitAll(
-          HTable.getDefaultExecutor(HTU.getConfiguration()),
-          table.getName(), gets, null, results);
+      int operationTimeout = ((ClusterConnection) HTU.getConnection()).getConnectionConfiguration().getOperationTimeout();
+      int readTimeout = ((ClusterConnection) HTU.getConnection()).getConnectionConfiguration().getReadRpcTimeout();
+      AsyncProcessTask task = AsyncProcessTask.newBuilder()
+              .setPool(HTable.getDefaultExecutor(HTU.getConfiguration()))
+              .setTableName(table.getName())
+              .setRowAccess(gets)
+              .setResults(results)
+              .setOperationTimeout(operationTimeout)
+              .setRpcTimeout(readTimeout)
+              .build();
+      AsyncRequestFuture reqs = ap.submit(task);
       reqs.waitUntilDone();
       // verify we got the right results back
       for (Object r : results) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java
index be41e54..295f47a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java
@@ -501,7 +501,6 @@ public class TestPerColumnFamilyFlush {
           Thread.sleep(100);
         }
       }
-      table.close();
       assertEquals(maxLogs, getNumRolledLogFiles(desiredRegion));
       assertTrue(desiredRegion.getStore(FAMILY1).getMemStoreSize() > cfFlushSizeLowerBound);
       assertTrue(desiredRegion.getStore(FAMILY2).getMemStoreSize() < cfFlushSizeLowerBound);

http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestTablePermissions.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestTablePermissions.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestTablePermissions.java
index 68fffb1..380c252 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestTablePermissions.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestTablePermissions.java
@@ -171,22 +171,35 @@ public class TestTablePermissions {
     }
   }
 
+  /**
+   * The AccessControlLists.addUserPermission may throw exception before closing the table.
+   */
+  private void addUserPermission(Configuration conf, UserPermission userPerm, Table t) throws
IOException {
+    try {
+      AccessControlLists.addUserPermission(conf, userPerm, t);
+    } finally {
+      t.close();
+    }
+  }
+
   @Test
   public void testBasicWrite() throws Exception {
     Configuration conf = UTIL.getConfiguration();
-    try (Connection connection = ConnectionFactory.createConnection(conf);
-        Table table = connection.getTable(AccessControlLists.ACL_TABLE_NAME)) {
+    try (Connection connection = ConnectionFactory.createConnection(conf)) {
       // add some permissions
-      AccessControlLists.addUserPermission(conf,
+      addUserPermission(conf,
           new UserPermission(Bytes.toBytes("george"), TEST_TABLE, null, (byte[])null,
-              UserPermission.Action.READ, UserPermission.Action.WRITE), table);
-      AccessControlLists.addUserPermission(conf,
+              UserPermission.Action.READ, UserPermission.Action.WRITE),
+              connection.getTable(AccessControlLists.ACL_TABLE_NAME));
+      addUserPermission(conf,
           new UserPermission(Bytes.toBytes("hubert"), TEST_TABLE, null, (byte[])null,
-              UserPermission.Action.READ), table);
-      AccessControlLists.addUserPermission(conf,
+              UserPermission.Action.READ),
+          connection.getTable(AccessControlLists.ACL_TABLE_NAME));
+      addUserPermission(conf,
           new UserPermission(Bytes.toBytes("humphrey"),
               TEST_TABLE, TEST_FAMILY, TEST_QUALIFIER,
-              UserPermission.Action.READ), table);
+              UserPermission.Action.READ),
+          connection.getTable(AccessControlLists.ACL_TABLE_NAME));
     }
     // retrieve the same
     ListMultimap<String,TablePermission> perms =
@@ -274,23 +287,22 @@ public class TestTablePermissions {
   @Test
   public void testPersistence() throws Exception {
     Configuration conf = UTIL.getConfiguration();
-    try (Connection connection = ConnectionFactory.createConnection(conf);
-        Table table = connection.getTable(AccessControlLists.ACL_TABLE_NAME)) {
-      AccessControlLists.addUserPermission(conf,
+    try (Connection connection = ConnectionFactory.createConnection(conf)) {
+      addUserPermission(conf,
           new UserPermission(Bytes.toBytes("albert"), TEST_TABLE, null,
-              (byte[])null, TablePermission.Action.READ), table);
-      AccessControlLists.addUserPermission(conf,
+              (byte[])null, TablePermission.Action.READ), connection.getTable(AccessControlLists.ACL_TABLE_NAME));
+      addUserPermission(conf,
           new UserPermission(Bytes.toBytes("betty"), TEST_TABLE, null,
               (byte[])null, TablePermission.Action.READ,
-              TablePermission.Action.WRITE), table);
-      AccessControlLists.addUserPermission(conf,
+              TablePermission.Action.WRITE), connection.getTable(AccessControlLists.ACL_TABLE_NAME));
+      addUserPermission(conf,
           new UserPermission(Bytes.toBytes("clark"),
               TEST_TABLE, TEST_FAMILY,
-              TablePermission.Action.READ), table);
-      AccessControlLists.addUserPermission(conf,
+              TablePermission.Action.READ), connection.getTable(AccessControlLists.ACL_TABLE_NAME));
+      addUserPermission(conf,
           new UserPermission(Bytes.toBytes("dwight"),
               TEST_TABLE, TEST_FAMILY, TEST_QUALIFIER,
-              TablePermission.Action.WRITE), table);
+              TablePermission.Action.WRITE), connection.getTable(AccessControlLists.ACL_TABLE_NAME));
     }
     // verify permissions survive changes in table metadata
     ListMultimap<String,TablePermission> preperms =
@@ -404,17 +416,17 @@ public class TestTablePermissions {
     Configuration conf = UTIL.getConfiguration();
 
     // add some permissions
-    try (Connection connection = ConnectionFactory.createConnection(conf);
-        Table table = connection.getTable(AccessControlLists.ACL_TABLE_NAME)) {
-      AccessControlLists.addUserPermission(conf,
+    try (Connection connection = ConnectionFactory.createConnection(conf)) {
+      addUserPermission(conf,
           new UserPermission(Bytes.toBytes("user1"),
-              Permission.Action.READ, Permission.Action.WRITE), table);
-      AccessControlLists.addUserPermission(conf,
+              Permission.Action.READ, Permission.Action.WRITE), connection.getTable(AccessControlLists.ACL_TABLE_NAME));
+      addUserPermission(conf,
           new UserPermission(Bytes.toBytes("user2"),
-              Permission.Action.CREATE), table);
-      AccessControlLists.addUserPermission(conf,
+              Permission.Action.CREATE), connection.getTable(AccessControlLists.ACL_TABLE_NAME));
+      addUserPermission(conf,
           new UserPermission(Bytes.toBytes("user3"),
-              Permission.Action.ADMIN, Permission.Action.READ, Permission.Action.CREATE),
table);
+              Permission.Action.ADMIN, Permission.Action.READ, Permission.Action.CREATE),
+          connection.getTable(AccessControlLists.ACL_TABLE_NAME));
     }
     ListMultimap<String,TablePermission> perms = AccessControlLists.getTablePermissions(conf,
null);
     List<TablePermission> user1Perms = perms.get("user1");
@@ -448,11 +460,11 @@ public class TestTablePermissions {
     // currently running user is the system user and should have global admin perms
     User currentUser = User.getCurrent();
     assertTrue(authManager.authorize(currentUser, Permission.Action.ADMIN));
-    try (Connection connection = ConnectionFactory.createConnection(conf);
-        Table table = connection.getTable(AccessControlLists.ACL_TABLE_NAME)) {
+    try (Connection connection = ConnectionFactory.createConnection(conf)) {
       for (int i=1; i<=50; i++) {
-        AccessControlLists.addUserPermission(conf, new UserPermission(Bytes.toBytes("testauth"+i),
-            Permission.Action.ADMIN, Permission.Action.READ, Permission.Action.WRITE), table);
+        addUserPermission(conf, new UserPermission(Bytes.toBytes("testauth"+i),
+            Permission.Action.ADMIN, Permission.Action.READ, Permission.Action.WRITE),
+            connection.getTable(AccessControlLists.ACL_TABLE_NAME));
         // make sure the system user still shows as authorized
         assertTrue("Failed current user auth check on iter "+i,
             authManager.authorize(currentUser, Permission.Action.ADMIN));


Mime
View raw message