From commits-return-84929-archive-asf-public=cust-asf.ponee.io@hbase.apache.org Fri Apr 12 07:09:15 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id BE58B180621 for ; Fri, 12 Apr 2019 09:09:14 +0200 (CEST) Received: (qmail 31867 invoked by uid 500); 12 Apr 2019 07:09:13 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 31823 invoked by uid 99); 12 Apr 2019 07:09:13 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 12 Apr 2019 07:09:13 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id A69B2814CA; Fri, 12 Apr 2019 07:09:13 +0000 (UTC) Date: Fri, 12 Apr 2019 07:09:14 +0000 To: "commits@hbase.apache.org" Subject: [hbase] 02/13: HBASE-21516 Use AsyncConnection instead of Connection in SecureBulkLoadManager MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit From: zhangduo@apache.org In-Reply-To: <155505295272.4969.17653673293982837413@gitbox.apache.org> References: <155505295272.4969.17653673293982837413@gitbox.apache.org> X-Git-Host: gitbox.apache.org X-Git-Repo: hbase X-Git-Refname: refs/heads/HBASE-21512 X-Git-Reftype: branch X-Git-Rev: afe5399b389c39cdbf4b1c75966c0c853077f6bc X-Git-NotificationType: diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated Message-Id: <20190412070913.A69B2814CA@gitbox.apache.org> This is an automated email from the ASF dual-hosted git repository. zhangduo pushed a commit to branch HBASE-21512 in repository https://gitbox.apache.org/repos/asf/hbase.git commit afe5399b389c39cdbf4b1c75966c0c853077f6bc Author: zhangduo AuthorDate: Sat Dec 1 21:15:48 2018 +0800 HBASE-21516 Use AsyncConnection instead of Connection in SecureBulkLoadManager --- .../apache/hadoop/hbase/protobuf/ProtobufUtil.java | 5 +- .../hadoop/hbase/shaded/protobuf/ProtobufUtil.java | 7 ++- .../hadoop/hbase/regionserver/HRegionServer.java | 2 +- .../hbase/regionserver/SecureBulkLoadManager.java | 24 ++++----- .../hadoop/hbase/security/token/TokenUtil.java | 57 +++++++++++++++++----- .../hadoop/hbase/security/token/TestTokenUtil.java | 42 ++++++++++++---- 6 files changed, 96 insertions(+), 41 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index a3d49b5..d9e620b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -261,13 +261,12 @@ public final class ProtobufUtil { * just {@link ServiceException}. Prefer this method to * {@link #getRemoteException(ServiceException)} because trying to * contain direct protobuf references. - * @param e */ - public static IOException handleRemoteException(Exception e) { + public static IOException handleRemoteException(Throwable e) { return makeIOExceptionOfException(e); } - private static IOException makeIOExceptionOfException(Exception e) { + private static IOException makeIOExceptionOfException(Throwable e) { Throwable t = e; if (e instanceof ServiceException || e instanceof org.apache.hbase.thirdparty.com.google.protobuf.ServiceException) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index 336c59c..0f9575b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -41,7 +41,6 @@ import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.regex.Pattern; import java.util.stream.Collectors; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.ByteBufferExtendedCell; @@ -124,6 +123,7 @@ import org.apache.hbase.thirdparty.com.google.protobuf.Service; import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; + import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; @@ -345,13 +345,12 @@ public final class ProtobufUtil { * just {@link ServiceException}. Prefer this method to * {@link #getRemoteException(ServiceException)} because trying to * contain direct protobuf references. - * @param e */ - public static IOException handleRemoteException(Exception e) { + public static IOException handleRemoteException(Throwable e) { return makeIOExceptionOfException(e); } - private static IOException makeIOExceptionOfException(Exception e) { + private static IOException makeIOExceptionOfException(Throwable e) { Throwable t = e; if (e instanceof ServiceException) { t = e.getCause(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index b0b9fe5..9a16036 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1939,7 +1939,7 @@ public class HRegionServer extends HasThread implements if (!isStopped() && !isAborted()) { initializeThreads(); } - this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, clusterConnection); + this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, asyncClusterConnection); this.secureBulkLoadManager.start(); // Health checker thread. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java index 6b55744..d54be75 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -27,7 +27,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Consumer; - import org.apache.commons.lang3.mutable.MutableInt; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -38,11 +37,12 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.AsyncConnection; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.regionserver.HRegion.BulkLoadListener; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier; import org.apache.hadoop.hbase.security.token.FsDelegationToken; import org.apache.hadoop.hbase.security.token.TokenUtil; import org.apache.hadoop.hbase.util.Bytes; @@ -56,7 +56,9 @@ import org.apache.hadoop.security.token.Token; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; + import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest; @@ -111,9 +113,9 @@ public class SecureBulkLoadManager { private UserProvider userProvider; private ConcurrentHashMap ugiReferenceCounter; - private Connection conn; + private AsyncConnection conn; - SecureBulkLoadManager(Configuration conf, Connection conn) { + SecureBulkLoadManager(Configuration conf, AsyncConnection conn) { this.conf = conf; this.conn = conn; } @@ -218,23 +220,23 @@ public class SecureBulkLoadManager { familyPaths.add(new Pair<>(el.getFamily().toByteArray(), el.getPath())); } - Token userToken = null; + Token userToken = null; if (userProvider.isHadoopSecurityEnabled()) { - userToken = new Token(request.getFsToken().getIdentifier().toByteArray(), request.getFsToken() - .getPassword().toByteArray(), new Text(request.getFsToken().getKind()), new Text( - request.getFsToken().getService())); + userToken = new Token<>(request.getFsToken().getIdentifier().toByteArray(), + request.getFsToken().getPassword().toByteArray(), new Text(request.getFsToken().getKind()), + new Text(request.getFsToken().getService())); } final String bulkToken = request.getBulkToken(); User user = getActiveUser(); final UserGroupInformation ugi = user.getUGI(); if (userProvider.isHadoopSecurityEnabled()) { try { - Token tok = TokenUtil.obtainToken(conn); + Token tok = TokenUtil.obtainToken(conn).get(); if (tok != null) { boolean b = ugi.addToken(tok); LOG.debug("token added " + tok + " for user " + ugi + " return=" + b); } - } catch (IOException ioe) { + } catch (Exception ioe) { LOG.warn("unable to add token", ioe); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java index c54d905..28efb84 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,27 +15,29 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.security.token; +import com.google.protobuf.ByteString; +import com.google.protobuf.ServiceException; import java.io.IOException; import java.lang.reflect.UndeclaredThrowableException; import java.security.PrivilegedExceptionAction; - -import com.google.protobuf.ByteString; -import com.google.protobuf.ServiceException; - -import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import java.util.concurrent.CompletableFuture; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.AsyncConnection; +import org.apache.hadoop.hbase.client.AsyncTable; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos; +import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.AuthenticationService; +import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.GetAuthenticationTokenRequest; +import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.GetAuthenticationTokenResponse; import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; @@ -45,6 +47,8 @@ import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; + /** * Utility methods for obtaining authentication tokens. */ @@ -64,12 +68,39 @@ public class TokenUtil { /** * Obtain and return an authentication token for the current user. + * @param conn The async HBase cluster connection + * @return the authentication token instance, wrapped by a {@link CompletableFuture}. + */ + public static CompletableFuture> obtainToken( + AsyncConnection conn) { + CompletableFuture> future = new CompletableFuture<>(); + if (injectedException != null) { + future.completeExceptionally(injectedException); + return future; + } + AsyncTable table = conn.getTable(TableName.META_TABLE_NAME); + table. coprocessorService( + AuthenticationProtos.AuthenticationService::newStub, + (s, c, r) -> s.getAuthenticationToken(c, + AuthenticationProtos.GetAuthenticationTokenRequest.getDefaultInstance(), r), + HConstants.EMPTY_START_ROW).whenComplete((resp, error) -> { + if (error != null) { + future.completeExceptionally(ProtobufUtil.handleRemoteException(error)); + } else { + future.complete(toToken(resp.getToken())); + } + }); + return future; + } + + /** + * Obtain and return an authentication token for the current user. * @param conn The HBase cluster connection * @throws IOException if a remote error or serialization problem occurs. * @return the authentication token instance */ - public static Token obtainToken( - Connection conn) throws IOException { + public static Token obtainToken(Connection conn) + throws IOException { Table meta = null; try { injectFault(); @@ -77,9 +108,9 @@ public class TokenUtil { meta = conn.getTable(TableName.META_TABLE_NAME); CoprocessorRpcChannel rpcChannel = meta.coprocessorService(HConstants.EMPTY_START_ROW); AuthenticationProtos.AuthenticationService.BlockingInterface service = - AuthenticationProtos.AuthenticationService.newBlockingStub(rpcChannel); - AuthenticationProtos.GetAuthenticationTokenResponse response = service.getAuthenticationToken(null, - AuthenticationProtos.GetAuthenticationTokenRequest.getDefaultInstance()); + AuthenticationService.newBlockingStub(rpcChannel); + GetAuthenticationTokenResponse response = + service.getAuthenticationToken(null, GetAuthenticationTokenRequest.getDefaultInstance()); return toToken(response.getToken()); } catch (ServiceException se) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenUtil.java index 32fcddb..585a3ec 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenUtil.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenUtil.java @@ -18,35 +18,53 @@ package org.apache.hadoop.hbase.security.token; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; import static org.junit.Assert.fail; +import java.io.IOException; import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.net.URL; import java.net.URLClassLoader; - +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.client.AsyncConnection; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.After; +import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.apache.hbase.thirdparty.com.google.common.io.Closeables; + import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; @Category(SmallTests.class) public class TestTokenUtil { + @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestTokenUtil.class); + HBaseClassTestRule.forClass(TestTokenUtil.class); - @Test - public void testObtainToken() throws Exception { + private URLClassLoader cl; + + @Before + public void setUp() { URL urlPU = ProtobufUtil.class.getProtectionDomain().getCodeSource().getLocation(); URL urlTU = TokenUtil.class.getProtectionDomain().getCodeSource().getLocation(); + cl = new URLClassLoader(new URL[] { urlPU, urlTU }, getClass().getClassLoader()); + } - ClassLoader cl = new URLClassLoader(new URL[] { urlPU, urlTU }, getClass().getClassLoader()); + @After + public void tearDown() throws IOException { + Closeables.close(cl, true); + } + @Test + public void testObtainToken() throws Exception { Throwable injected = new com.google.protobuf.ServiceException("injected"); Class tokenUtil = cl.loadClass(TokenUtil.class.getCanonicalName()); @@ -55,8 +73,7 @@ public class TestTokenUtil { shouldInjectFault.set(null, injected); try { - tokenUtil.getMethod("obtainToken", Connection.class) - .invoke(null, new Object[] { null }); + tokenUtil.getMethod("obtainToken", Connection.class).invoke(null, new Object[] { null }); fail("Should have injected exception."); } catch (InvocationTargetException e) { Throwable t = e; @@ -72,9 +89,16 @@ public class TestTokenUtil { } } + CompletableFuture future = (CompletableFuture) tokenUtil + .getMethod("obtainToken", AsyncConnection.class).invoke(null, new Object[] { null }); + try { + future.get(); + fail("Should have injected exception."); + } catch (ExecutionException e) { + assertSame(injected, e.getCause()); + } Boolean loaded = (Boolean) cl.loadClass(ProtobufUtil.class.getCanonicalName()) - .getDeclaredMethod("isClassLoaderLoaded") - .invoke(null); + .getDeclaredMethod("isClassLoaderLoaded").invoke(null); assertFalse("Should not have loaded DynamicClassLoader", loaded); } }