Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id AE795200BF5 for ; Sat, 24 Dec 2016 05:10:53 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id AD279160B1F; Sat, 24 Dec 2016 04:10:53 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 604A0160B37 for ; Sat, 24 Dec 2016 05:10:52 +0100 (CET) Received: (qmail 32422 invoked by uid 500); 24 Dec 2016 04:10:46 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 32155 invoked by uid 99); 24 Dec 2016 04:10:46 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 24 Dec 2016 04:10:46 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 04632DFC70; Sat, 24 Dec 2016 04:10:45 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zhangduo@apache.org To: commits@hbase.apache.org Date: Sat, 24 Dec 2016 04:10:45 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/3] hbase git commit: HBASE-17174 Refactor the AsyncProcess, BufferedMutatorImpl, and HTable archived-at: Sat, 24 Dec 2016 04:10:53 -0000 Repository: hbase Updated Branches: refs/heads/master 992e5717d -> 8cb55c408 http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSimpleRequestController.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSimpleRequestController.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSimpleRequestController.java new file mode 100644 index 0000000..b46e572 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSimpleRequestController.java @@ -0,0 +1,336 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RequestController.ReturnCode; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Assert; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ClientTests.class, SmallTests.class}) +public class TestSimpleRequestController { + + private static final TableName DUMMY_TABLE + = TableName.valueOf("DUMMY_TABLE"); + private static final byte[] DUMMY_BYTES_1 = "DUMMY_BYTES_1".getBytes(); + private static final byte[] DUMMY_BYTES_2 = "DUMMY_BYTES_2".getBytes(); + private static final byte[] DUMMY_BYTES_3 = "DUMMY_BYTES_3".getBytes(); + private static final ServerName SN = ServerName.valueOf("s1:1,1"); + private static final ServerName SN2 = ServerName.valueOf("s2:2,2"); + private static final ServerName SN3 = ServerName.valueOf("s3:3,3"); + private static final HRegionInfo HRI1 + = new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_1, DUMMY_BYTES_2, false, 1); + private static final HRegionInfo HRI2 + = new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_2, HConstants.EMPTY_END_ROW, false, 2); + private static final HRegionInfo HRI3 + = new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_3, HConstants.EMPTY_END_ROW, false, 3); + private static final HRegionLocation LOC1 = new HRegionLocation(HRI1, SN); + private static final HRegionLocation LOC2 = new HRegionLocation(HRI2, SN); + private static final HRegionLocation LOC3 = new HRegionLocation(HRI3, SN2); + + @Test + public void testIllegalRequestSize() { + testIllegalArgument(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, -1); + } + + @Test + public void testIllegalRsTasks() { + testIllegalArgument(HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS, -1); + } + + @Test + public void testIllegalRegionTasks() { + testIllegalArgument(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS, -1); + } + + @Test + public void testIllegalSubmittedSize() { + testIllegalArgument(SimpleRequestController.HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE, -1); + } + + private void testIllegalArgument(String key, long value) { + Configuration conf = HBaseConfiguration.create(); + conf.setLong(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, -1); + try { + SimpleRequestController controller = new SimpleRequestController(conf); + fail("The " + key + " must be bigger than zero"); + } catch (IllegalArgumentException e) { + } + } + + private static Put createPut(long maxHeapSizePerRequest) { + return new Put(Bytes.toBytes("row")) { + @Override + public long heapSize() { + return maxHeapSizePerRequest; + } + }; + } + + @Test + public void testTaskCheckerHost() throws IOException { + final int maxTotalConcurrentTasks = 100; + final int maxConcurrentTasksPerServer = 2; + final int maxConcurrentTasksPerRegion = 1; + final AtomicLong tasksInProgress = new AtomicLong(0); + final Map taskCounterPerServer = new HashMap<>(); + final Map taskCounterPerRegion = new HashMap<>(); + SimpleRequestController.TaskCountChecker countChecker = new SimpleRequestController.TaskCountChecker( + maxTotalConcurrentTasks, + maxConcurrentTasksPerServer, + maxConcurrentTasksPerRegion, + tasksInProgress, taskCounterPerServer, taskCounterPerRegion); + final long maxHeapSizePerRequest = 2 * 1024 * 1024; + // unlimiited + SimpleRequestController.RequestSizeChecker sizeChecker = new SimpleRequestController.RequestSizeChecker(maxHeapSizePerRequest); + RequestController.Checker checker = SimpleRequestController.newChecker(Arrays.asList(countChecker, sizeChecker)); + ReturnCode loc1Code = checker.canTakeRow(LOC1, createPut(maxHeapSizePerRequest)); + assertEquals(ReturnCode.INCLUDE, loc1Code); + + ReturnCode loc1Code_2 = checker.canTakeRow(LOC1, createPut(maxHeapSizePerRequest)); + // rejected for size + assertNotEquals(ReturnCode.INCLUDE, loc1Code_2); + + ReturnCode loc2Code = checker.canTakeRow(LOC2, createPut(maxHeapSizePerRequest)); + // rejected for size + assertNotEquals(ReturnCode.INCLUDE, loc2Code); + + // fill the task slots for LOC3. + taskCounterPerRegion.put(LOC3.getRegionInfo().getRegionName(), new AtomicInteger(100)); + taskCounterPerServer.put(LOC3.getServerName(), new AtomicInteger(100)); + + ReturnCode loc3Code = checker.canTakeRow(LOC3, createPut(1L)); + // rejected for count + assertNotEquals(ReturnCode.INCLUDE, loc3Code); + + // release the task slots for LOC3. + taskCounterPerRegion.put(LOC3.getRegionInfo().getRegionName(), new AtomicInteger(0)); + taskCounterPerServer.put(LOC3.getServerName(), new AtomicInteger(0)); + + ReturnCode loc3Code_2 = checker.canTakeRow(LOC3, createPut(1L)); + assertEquals(ReturnCode.INCLUDE, loc3Code_2); + } + + @Test + public void testRequestSizeCheckerr() throws IOException { + final long maxHeapSizePerRequest = 2 * 1024 * 1024; + SimpleRequestController.RequestSizeChecker checker + = new SimpleRequestController.RequestSizeChecker(maxHeapSizePerRequest); + + // inner state is unchanged. + for (int i = 0; i != 10; ++i) { + ReturnCode code = checker.canTakeOperation(LOC1, maxHeapSizePerRequest); + assertEquals(ReturnCode.INCLUDE, code); + code = checker.canTakeOperation(LOC2, maxHeapSizePerRequest); + assertEquals(ReturnCode.INCLUDE, code); + } + + // accept the data located on LOC1 region. + ReturnCode acceptCode = checker.canTakeOperation(LOC1, maxHeapSizePerRequest); + assertEquals(ReturnCode.INCLUDE, acceptCode); + checker.notifyFinal(acceptCode, LOC1, maxHeapSizePerRequest); + + // the sn server reachs the limit. + for (int i = 0; i != 10; ++i) { + ReturnCode code = checker.canTakeOperation(LOC1, maxHeapSizePerRequest); + assertNotEquals(ReturnCode.INCLUDE, code); + code = checker.canTakeOperation(LOC2, maxHeapSizePerRequest); + assertNotEquals(ReturnCode.INCLUDE, code); + } + + // the request to sn2 server should be accepted. + for (int i = 0; i != 10; ++i) { + ReturnCode code = checker.canTakeOperation(LOC3, maxHeapSizePerRequest); + assertEquals(ReturnCode.INCLUDE, code); + } + + checker.reset(); + for (int i = 0; i != 10; ++i) { + ReturnCode code = checker.canTakeOperation(LOC1, maxHeapSizePerRequest); + assertEquals(ReturnCode.INCLUDE, code); + code = checker.canTakeOperation(LOC2, maxHeapSizePerRequest); + assertEquals(ReturnCode.INCLUDE, code); + } + } + + @Test + public void testSubmittedSizeChecker() { + final long maxHeapSizeSubmit = 2 * 1024 * 1024; + SimpleRequestController.SubmittedSizeChecker checker + = new SimpleRequestController.SubmittedSizeChecker(maxHeapSizeSubmit); + + for (int i = 0; i != 10; ++i) { + ReturnCode include = checker.canTakeOperation(LOC1, 100000); + assertEquals(ReturnCode.INCLUDE, include); + } + + for (int i = 0; i != 10; ++i) { + checker.notifyFinal(ReturnCode.INCLUDE, LOC1, maxHeapSizeSubmit); + } + + for (int i = 0; i != 10; ++i) { + ReturnCode include = checker.canTakeOperation(LOC1, 100000); + assertEquals(ReturnCode.END, include); + } + for (int i = 0; i != 10; ++i) { + ReturnCode include = checker.canTakeOperation(LOC2, 100000); + assertEquals(ReturnCode.END, include); + } + checker.reset(); + for (int i = 0; i != 10; ++i) { + ReturnCode include = checker.canTakeOperation(LOC1, 100000); + assertEquals(ReturnCode.INCLUDE, include); + } + } + + @Test + public void testTaskCountChecker() throws InterruptedIOException { + long rowSize = 12345; + int maxTotalConcurrentTasks = 100; + int maxConcurrentTasksPerServer = 2; + int maxConcurrentTasksPerRegion = 1; + AtomicLong tasksInProgress = new AtomicLong(0); + Map taskCounterPerServer = new HashMap<>(); + Map taskCounterPerRegion = new HashMap<>(); + SimpleRequestController.TaskCountChecker checker = new SimpleRequestController.TaskCountChecker( + maxTotalConcurrentTasks, + maxConcurrentTasksPerServer, + maxConcurrentTasksPerRegion, + tasksInProgress, taskCounterPerServer, taskCounterPerRegion); + + // inner state is unchanged. + for (int i = 0; i != 10; ++i) { + ReturnCode code = checker.canTakeOperation(LOC1, rowSize); + assertEquals(ReturnCode.INCLUDE, code); + } + // add LOC1 region. + ReturnCode code = checker.canTakeOperation(LOC1, rowSize); + assertEquals(ReturnCode.INCLUDE, code); + checker.notifyFinal(code, LOC1, rowSize); + + // fill the task slots for LOC1. + taskCounterPerRegion.put(LOC1.getRegionInfo().getRegionName(), new AtomicInteger(100)); + taskCounterPerServer.put(LOC1.getServerName(), new AtomicInteger(100)); + + // the region was previously accepted, so it must be accpted now. + for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) { + ReturnCode includeCode = checker.canTakeOperation(LOC1, rowSize); + assertEquals(ReturnCode.INCLUDE, includeCode); + checker.notifyFinal(includeCode, LOC1, rowSize); + } + + // fill the task slots for LOC3. + taskCounterPerRegion.put(LOC3.getRegionInfo().getRegionName(), new AtomicInteger(100)); + taskCounterPerServer.put(LOC3.getServerName(), new AtomicInteger(100)); + + // no task slots. + for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) { + ReturnCode excludeCode = checker.canTakeOperation(LOC3, rowSize); + assertNotEquals(ReturnCode.INCLUDE, excludeCode); + checker.notifyFinal(excludeCode, LOC3, rowSize); + } + + // release the tasks for LOC3. + taskCounterPerRegion.put(LOC3.getRegionInfo().getRegionName(), new AtomicInteger(0)); + taskCounterPerServer.put(LOC3.getServerName(), new AtomicInteger(0)); + + // add LOC3 region. + ReturnCode code3 = checker.canTakeOperation(LOC3, rowSize); + assertEquals(ReturnCode.INCLUDE, code3); + checker.notifyFinal(code3, LOC3, rowSize); + + // the region was previously accepted, so it must be accpted now. + for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) { + ReturnCode includeCode = checker.canTakeOperation(LOC3, rowSize); + assertEquals(ReturnCode.INCLUDE, includeCode); + checker.notifyFinal(includeCode, LOC3, rowSize); + } + + checker.reset(); + // the region was previously accepted, + // but checker have reseted and task slots for LOC1 is full. + // So it must be rejected now. + for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) { + ReturnCode includeCode = checker.canTakeOperation(LOC1, rowSize); + assertNotEquals(ReturnCode.INCLUDE, includeCode); + checker.notifyFinal(includeCode, LOC1, rowSize); + } + } + + @Test + public void testWaitForMaximumCurrentTasks() throws Exception { + final AtomicInteger max = new AtomicInteger(0); + final CyclicBarrier barrier = new CyclicBarrier(2); + SimpleRequestController controller = new SimpleRequestController(HBaseConfiguration.create()); + final AtomicLong tasks = controller.tasksInProgress; + Runnable runnable = () -> { + try { + barrier.await(); + controller.waitForMaximumCurrentTasks(max.get(), 123, 1, null); + } catch (InterruptedIOException e) { + Assert.fail(e.getMessage()); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (BrokenBarrierException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + }; + // First test that our runnable thread only exits when tasks is zero. + Thread t = new Thread(runnable); + t.start(); + barrier.await(); + t.join(); + // Now assert we stay running if max == zero and tasks is > 0. + barrier.reset(); + tasks.set(1000000); + t = new Thread(runnable); + t.start(); + barrier.await(); + while (tasks.get() > 0) { + assertTrue(t.isAlive()); + tasks.set(tasks.get() - 1); + } + t.join(); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java index ee89609..e5ab3e8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java @@ -126,11 +126,8 @@ public class HConnectionTestingUtility { NonceGenerator ng = Mockito.mock(NonceGenerator.class); Mockito.when(c.getNonceGenerator()).thenReturn(ng); Mockito.when(c.getAsyncProcess()).thenReturn( - new AsyncProcess(c, conf, null, RpcRetryingCallerFactory.instantiate(conf), false, - RpcControllerFactory.instantiate(conf), conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, - HConstants.DEFAULT_HBASE_RPC_TIMEOUT), conf.getInt( - HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, - HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT))); + new AsyncProcess(c, conf, RpcRetryingCallerFactory.instantiate(conf), false, + RpcControllerFactory.instantiate(conf))); Mockito.when(c.getNewRpcRetryingCallerFactory(conf)).thenReturn( RpcRetryingCallerFactory.instantiate(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null)); http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java index 53488ec..2c5e89d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java @@ -28,6 +28,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.client.AsyncProcessTask; import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; import org.apache.hadoop.hbase.client.backoff.ExponentialClientBackoffPolicy; import org.apache.hadoop.hbase.client.backoff.ServerStatistics; @@ -137,14 +138,20 @@ public class TestClientPushback { final CountDownLatch latch = new CountDownLatch(1); final AtomicLong endTime = new AtomicLong(); long startTime = EnvironmentEdgeManager.currentTime(); - - ((HTable) table).mutator.ap.submit(null, tableName, ops, true, new Batch.Callback() { - @Override - public void update(byte[] region, byte[] row, Result result) { + BufferedMutatorImpl mutator = ((HTable) table).mutator; + Batch.Callback callback = (byte[] r, byte[] row, Result result) -> { endTime.set(EnvironmentEdgeManager.currentTime()); latch.countDown(); - } - }, true); + }; + AsyncProcessTask task = AsyncProcessTask.newBuilder(callback) + .setPool(mutator.getPool()) + .setTableName(tableName) + .setRowAccess(ops) + .setSubmittedRows(AsyncProcessTask.SubmittedRows.AT_LEAST_ONE) + .setOperationTimeout(conn.getConnectionConfiguration().getOperationTimeout()) + .setRpcTimeout(60 * 1000) + .build(); + mutator.getAsyncProcess().submit(task); // Currently the ExponentialClientBackoffPolicy under these test conditions // produces a backoffTime of 151 milliseconds. This is long enough so the // wait and related checks below are reasonable. Revisit if the backoff http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java index 6d1e1f0..0f7f3d9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java @@ -563,9 +563,17 @@ public class TestReplicasClient { gets.add(g); Object[] results = new Object[2]; - AsyncRequestFuture reqs = ap.submitAll( - HTable.getDefaultExecutor(HTU.getConfiguration()), - table.getName(), gets, null, results); + int operationTimeout = ((ClusterConnection) HTU.getConnection()).getConnectionConfiguration().getOperationTimeout(); + int readTimeout = ((ClusterConnection) HTU.getConnection()).getConnectionConfiguration().getReadRpcTimeout(); + AsyncProcessTask task = AsyncProcessTask.newBuilder() + .setPool(HTable.getDefaultExecutor(HTU.getConfiguration())) + .setTableName(table.getName()) + .setRowAccess(gets) + .setResults(results) + .setOperationTimeout(operationTimeout) + .setRpcTimeout(readTimeout) + .build(); + AsyncRequestFuture reqs = ap.submit(task); reqs.waitUntilDone(); // verify we got the right results back for (Object r : results) { http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java index be41e54..295f47a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java @@ -501,7 +501,6 @@ public class TestPerColumnFamilyFlush { Thread.sleep(100); } } - table.close(); assertEquals(maxLogs, getNumRolledLogFiles(desiredRegion)); assertTrue(desiredRegion.getStore(FAMILY1).getMemStoreSize() > cfFlushSizeLowerBound); assertTrue(desiredRegion.getStore(FAMILY2).getMemStoreSize() < cfFlushSizeLowerBound); http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestTablePermissions.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestTablePermissions.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestTablePermissions.java index 68fffb1..380c252 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestTablePermissions.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestTablePermissions.java @@ -171,22 +171,35 @@ public class TestTablePermissions { } } + /** + * The AccessControlLists.addUserPermission may throw exception before closing the table. + */ + private void addUserPermission(Configuration conf, UserPermission userPerm, Table t) throws IOException { + try { + AccessControlLists.addUserPermission(conf, userPerm, t); + } finally { + t.close(); + } + } + @Test public void testBasicWrite() throws Exception { Configuration conf = UTIL.getConfiguration(); - try (Connection connection = ConnectionFactory.createConnection(conf); - Table table = connection.getTable(AccessControlLists.ACL_TABLE_NAME)) { + try (Connection connection = ConnectionFactory.createConnection(conf)) { // add some permissions - AccessControlLists.addUserPermission(conf, + addUserPermission(conf, new UserPermission(Bytes.toBytes("george"), TEST_TABLE, null, (byte[])null, - UserPermission.Action.READ, UserPermission.Action.WRITE), table); - AccessControlLists.addUserPermission(conf, + UserPermission.Action.READ, UserPermission.Action.WRITE), + connection.getTable(AccessControlLists.ACL_TABLE_NAME)); + addUserPermission(conf, new UserPermission(Bytes.toBytes("hubert"), TEST_TABLE, null, (byte[])null, - UserPermission.Action.READ), table); - AccessControlLists.addUserPermission(conf, + UserPermission.Action.READ), + connection.getTable(AccessControlLists.ACL_TABLE_NAME)); + addUserPermission(conf, new UserPermission(Bytes.toBytes("humphrey"), TEST_TABLE, TEST_FAMILY, TEST_QUALIFIER, - UserPermission.Action.READ), table); + UserPermission.Action.READ), + connection.getTable(AccessControlLists.ACL_TABLE_NAME)); } // retrieve the same ListMultimap perms = @@ -274,23 +287,22 @@ public class TestTablePermissions { @Test public void testPersistence() throws Exception { Configuration conf = UTIL.getConfiguration(); - try (Connection connection = ConnectionFactory.createConnection(conf); - Table table = connection.getTable(AccessControlLists.ACL_TABLE_NAME)) { - AccessControlLists.addUserPermission(conf, + try (Connection connection = ConnectionFactory.createConnection(conf)) { + addUserPermission(conf, new UserPermission(Bytes.toBytes("albert"), TEST_TABLE, null, - (byte[])null, TablePermission.Action.READ), table); - AccessControlLists.addUserPermission(conf, + (byte[])null, TablePermission.Action.READ), connection.getTable(AccessControlLists.ACL_TABLE_NAME)); + addUserPermission(conf, new UserPermission(Bytes.toBytes("betty"), TEST_TABLE, null, (byte[])null, TablePermission.Action.READ, - TablePermission.Action.WRITE), table); - AccessControlLists.addUserPermission(conf, + TablePermission.Action.WRITE), connection.getTable(AccessControlLists.ACL_TABLE_NAME)); + addUserPermission(conf, new UserPermission(Bytes.toBytes("clark"), TEST_TABLE, TEST_FAMILY, - TablePermission.Action.READ), table); - AccessControlLists.addUserPermission(conf, + TablePermission.Action.READ), connection.getTable(AccessControlLists.ACL_TABLE_NAME)); + addUserPermission(conf, new UserPermission(Bytes.toBytes("dwight"), TEST_TABLE, TEST_FAMILY, TEST_QUALIFIER, - TablePermission.Action.WRITE), table); + TablePermission.Action.WRITE), connection.getTable(AccessControlLists.ACL_TABLE_NAME)); } // verify permissions survive changes in table metadata ListMultimap preperms = @@ -404,17 +416,17 @@ public class TestTablePermissions { Configuration conf = UTIL.getConfiguration(); // add some permissions - try (Connection connection = ConnectionFactory.createConnection(conf); - Table table = connection.getTable(AccessControlLists.ACL_TABLE_NAME)) { - AccessControlLists.addUserPermission(conf, + try (Connection connection = ConnectionFactory.createConnection(conf)) { + addUserPermission(conf, new UserPermission(Bytes.toBytes("user1"), - Permission.Action.READ, Permission.Action.WRITE), table); - AccessControlLists.addUserPermission(conf, + Permission.Action.READ, Permission.Action.WRITE), connection.getTable(AccessControlLists.ACL_TABLE_NAME)); + addUserPermission(conf, new UserPermission(Bytes.toBytes("user2"), - Permission.Action.CREATE), table); - AccessControlLists.addUserPermission(conf, + Permission.Action.CREATE), connection.getTable(AccessControlLists.ACL_TABLE_NAME)); + addUserPermission(conf, new UserPermission(Bytes.toBytes("user3"), - Permission.Action.ADMIN, Permission.Action.READ, Permission.Action.CREATE), table); + Permission.Action.ADMIN, Permission.Action.READ, Permission.Action.CREATE), + connection.getTable(AccessControlLists.ACL_TABLE_NAME)); } ListMultimap perms = AccessControlLists.getTablePermissions(conf, null); List user1Perms = perms.get("user1"); @@ -448,11 +460,11 @@ public class TestTablePermissions { // currently running user is the system user and should have global admin perms User currentUser = User.getCurrent(); assertTrue(authManager.authorize(currentUser, Permission.Action.ADMIN)); - try (Connection connection = ConnectionFactory.createConnection(conf); - Table table = connection.getTable(AccessControlLists.ACL_TABLE_NAME)) { + try (Connection connection = ConnectionFactory.createConnection(conf)) { for (int i=1; i<=50; i++) { - AccessControlLists.addUserPermission(conf, new UserPermission(Bytes.toBytes("testauth"+i), - Permission.Action.ADMIN, Permission.Action.READ, Permission.Action.WRITE), table); + addUserPermission(conf, new UserPermission(Bytes.toBytes("testauth"+i), + Permission.Action.ADMIN, Permission.Action.READ, Permission.Action.WRITE), + connection.getTable(AccessControlLists.ACL_TABLE_NAME)); // make sure the system user still shows as authorized assertTrue("Failed current user auth check on iter "+i, authManager.authorize(currentUser, Permission.Action.ADMIN));