Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3DDD5200AC8 for ; Tue, 7 Jun 2016 18:34:10 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 3C78F160A36; Tue, 7 Jun 2016 16:34:10 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B05F6160968 for ; Tue, 7 Jun 2016 18:34:08 +0200 (CEST) Received: (qmail 56958 invoked by uid 500); 7 Jun 2016 16:34:07 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 56949 invoked by uid 99); 7 Jun 2016 16:34:07 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 07 Jun 2016 16:34:07 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B5E75DFDA9; Tue, 7 Jun 2016 16:34:07 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: stack@apache.org To: commits@hbase.apache.org Message-Id: <152071d68aa14799b95f2add586d1f65@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hbase git commit: HBASE-15913 Sasl encryption doesn't work with AsyncRpcChannel Date: Tue, 7 Jun 2016 16:34:07 +0000 (UTC) archived-at: Tue, 07 Jun 2016 16:34:10 -0000 Repository: hbase Updated Branches: refs/heads/branch-1.3 05e1013b0 -> f51dfe108 HBASE-15913 Sasl encryption doesn't work with AsyncRpcChannel Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f51dfe10 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f51dfe10 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f51dfe10 Branch: refs/heads/branch-1.3 Commit: f51dfe1086efd2634f9da127b0f1c48d5b54556b Parents: 05e1013 Author: stack Authored: Tue Jun 7 09:21:58 2016 -0700 Committer: stack Committed: Tue Jun 7 09:33:54 2016 -0700 ---------------------------------------------------------------------- .../hadoop/hbase/ipc/AsyncRpcChannel.java | 61 +++- .../hbase/security/SaslClientHandler.java | 22 +- .../hbase/security/AbstractTestSecureIPC.java | 305 +++++++++++++++++++ .../hbase/security/TestAsyncSecureIPC.java | 33 ++ .../hadoop/hbase/security/TestSecureIPC.java | 33 ++ .../hadoop/hbase/security/TestSecureRPC.java | 291 ------------------ 6 files changed, 441 insertions(+), 304 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/f51dfe10/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java index 9ab17f5..93063b8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java @@ -240,6 +240,30 @@ public class AsyncRpcChannel { } /** + * Start HBase connection with sasl encryption + * @param ch channel to start connection on + */ + private void startConnectionWithEncryption(Channel ch) { + // for rpc encryption, the order of ChannelInboundHandler should be: + // LengthFieldBasedFrameDecoder->SaslClientHandler->LengthFieldBasedFrameDecoder + // Don't skip the first 4 bytes for length in beforeUnwrapDecoder, + // SaslClientHandler will handler this + ch.pipeline().addFirst("beforeUnwrapDecoder", + new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 0)); + ch.pipeline().addLast("afterUnwrapDecoder", + new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); + ch.pipeline().addLast(new AsyncServerResponseHandler(this)); + List callsToWrite; + synchronized (pendingCalls) { + connected = true; + callsToWrite = new ArrayList(pendingCalls.values()); + } + for (AsyncCall call : callsToWrite) { + writeRequest(call); + } + } + + /** * Get SASL handler * @param bootstrap to reconnect to * @return new SASL handler @@ -251,6 +275,7 @@ public class AsyncRpcChannel { client.fallbackAllowed, client.conf.get("hbase.rpc.protection", SaslUtil.QualityOfProtection.AUTHENTICATION.name().toLowerCase()), + getChannelHeaderBytes(authMethod), new SaslClientHandler.SaslExceptionHandler() { @Override public void handle(int retryCount, Random random, Throwable cause) { @@ -274,6 +299,11 @@ public class AsyncRpcChannel { public void onSuccess(Channel channel) { startHBaseConnection(channel); } + + @Override + public void onSaslProtectionSucess(Channel channel) { + startConnectionWithEncryption(channel); + } }); } @@ -357,6 +387,26 @@ public class AsyncRpcChannel { * @throws java.io.IOException on failure to write */ private ChannelFuture writeChannelHeader(Channel channel) throws IOException { + RPCProtos.ConnectionHeader header = getChannelHeader(authMethod); + int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header); + + ByteBuf b = channel.alloc().directBuffer(totalSize); + + b.writeInt(header.getSerializedSize()); + b.writeBytes(header.toByteArray()); + + return channel.writeAndFlush(b); + } + + private byte[] getChannelHeaderBytes(AuthMethod authMethod) { + RPCProtos.ConnectionHeader header = getChannelHeader(authMethod); + ByteBuffer b = ByteBuffer.allocate(header.getSerializedSize() + 4); + b.putInt(header.getSerializedSize()); + b.put(header.toByteArray()); + return b.array(); + } + + private RPCProtos.ConnectionHeader getChannelHeader(AuthMethod authMethod) { RPCProtos.ConnectionHeader.Builder headerBuilder = RPCProtos.ConnectionHeader.newBuilder() .setServiceName(serviceName); @@ -373,16 +423,7 @@ public class AsyncRpcChannel { } headerBuilder.setVersionInfo(ProtobufUtil.getVersionInfo()); - RPCProtos.ConnectionHeader header = headerBuilder.build(); - - int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header); - - ByteBuf b = channel.alloc().directBuffer(totalSize); - - b.writeInt(header.getSerializedSize()); - b.writeBytes(header.toByteArray()); - - return channel.writeAndFlush(b); + return headerBuilder.build(); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/f51dfe10/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslClientHandler.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslClientHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslClientHandler.java index f52987b..c79cde7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslClientHandler.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslClientHandler.java @@ -61,6 +61,7 @@ public class SaslClientHandler extends ChannelDuplexHandler { private final SaslExceptionHandler exceptionHandler; private final SaslSuccessfulConnectHandler successfulConnectHandler; private byte[] saslToken; + private byte[] connectionHeader; private boolean firstRead = true; private int retryCount = 0; @@ -82,10 +83,11 @@ public class SaslClientHandler extends ChannelDuplexHandler { */ public SaslClientHandler(UserGroupInformation ticket, AuthMethod method, Token token, String serverPrincipal, boolean fallbackAllowed, - String rpcProtection, SaslExceptionHandler exceptionHandler, + String rpcProtection, byte[] connectionHeader, SaslExceptionHandler exceptionHandler, SaslSuccessfulConnectHandler successfulConnectHandler) throws IOException { this.ticket = ticket; this.fallbackAllowed = fallbackAllowed; + this.connectionHeader = connectionHeader; this.exceptionHandler = exceptionHandler; this.successfulConnectHandler = successfulConnectHandler; @@ -236,8 +238,13 @@ public class SaslClientHandler extends ChannelDuplexHandler { if (!useWrap) { ctx.pipeline().remove(this); + successfulConnectHandler.onSuccess(ctx.channel()); + } else { + byte[] wrappedCH = saslClient.wrap(connectionHeader, 0, connectionHeader.length); + // write connection header + writeSaslToken(ctx, wrappedCH); + successfulConnectHandler.onSaslProtectionSucess(ctx.channel()); } - successfulConnectHandler.onSuccess(ctx.channel()); } } // Normal wrapped reading @@ -322,9 +329,11 @@ public class SaslClientHandler extends ChannelDuplexHandler { super.write(ctx, msg, promise); } else { ByteBuf in = (ByteBuf) msg; + byte[] unwrapped = new byte[in.readableBytes()]; + in.readBytes(unwrapped); try { - saslToken = saslClient.wrap(in.array(), in.readerIndex(), in.readableBytes()); + saslToken = saslClient.wrap(unwrapped, 0, unwrapped.length); } catch (SaslException se) { try { saslClient.dispose(); @@ -375,5 +384,12 @@ public class SaslClientHandler extends ChannelDuplexHandler { * @param channel which is successfully authenticated */ public void onSuccess(Channel channel); + + /** + * Runs on success if data protection used in Sasl + * + * @param channel which is successfully authenticated + */ + public void onSaslProtectionSucess(Channel channel); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/f51dfe10/hbase-server/src/test/java/org/apache/hadoop/hbase/security/AbstractTestSecureIPC.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/AbstractTestSecureIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/AbstractTestSecureIPC.java new file mode 100644 index 0000000..e2eb84b --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/AbstractTestSecureIPC.java @@ -0,0 +1,305 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.security; + +import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getKeytabFileForTesting; +import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getPrincipalForTesting; +import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getSecuredConfiguration; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Properties; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; +import org.apache.commons.lang.RandomStringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.ipc.FifoRpcScheduler; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.RpcClient; +import org.apache.hadoop.hbase.ipc.RpcClientFactory; +import org.apache.hadoop.hbase.ipc.RpcServer; +import org.apache.hadoop.hbase.ipc.RpcServerInterface; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos; +import org.apache.hadoop.minikdc.MiniKdc; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.mockito.Mockito; + +import com.google.common.collect.Lists; +import com.google.protobuf.BlockingRpcChannel; +import com.google.protobuf.BlockingService; + +import javax.security.sasl.SaslException; + +public abstract class AbstractTestSecureIPC { + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static final File KEYTAB_FILE = new File(TEST_UTIL.getDataTestDir("keytab").toUri() + .getPath()); + + static final BlockingService SERVICE = + TestRpcServiceProtos.TestProtobufRpcProto.newReflectiveBlockingService( + new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface() { + + @Override + public TestProtos.EmptyResponseProto ping(RpcController controller, + TestProtos.EmptyRequestProto request) + throws ServiceException { + return null; + } + + @Override + public TestProtos.EmptyResponseProto error(RpcController controller, + TestProtos.EmptyRequestProto request) + throws ServiceException { + return null; + } + + @Override + public TestProtos.EchoResponseProto echo(RpcController controller, + TestProtos.EchoRequestProto request) + throws ServiceException { + if (controller instanceof PayloadCarryingRpcController) { + PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController) controller; + // If cells, scan them to check we are able to iterate what we were given and since + // this is + // an echo, just put them back on the controller creating a new block. Tests our + // block + // building. + CellScanner cellScanner = pcrc.cellScanner(); + List list = null; + if (cellScanner != null) { + list = new ArrayList(); + try { + while (cellScanner.advance()) { + list.add(cellScanner.current()); + } + } catch (IOException e) { + throw new ServiceException(e); + } + } + cellScanner = CellUtil.createCellScanner(list); + ((PayloadCarryingRpcController) controller).setCellScanner(cellScanner); + } + return TestProtos.EchoResponseProto.newBuilder() + .setMessage(request.getMessage()).build(); + } + }); + + private static MiniKdc KDC; + private static String HOST = "localhost"; + private static String PRINCIPAL; + + String krbKeytab; + String krbPrincipal; + UserGroupInformation ugi; + Configuration clientConf; + Configuration serverConf; + + abstract Class getRpcClientClass(); + + @Rule + public ExpectedException exception = ExpectedException.none(); + + @BeforeClass + public static void setUp() throws Exception { + Properties conf = MiniKdc.createConf(); + conf.put(MiniKdc.DEBUG, true); + KDC = new MiniKdc(conf, new File(TEST_UTIL.getDataTestDir("kdc").toUri().getPath())); + KDC.start(); + PRINCIPAL = "hbase/" + HOST; + KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL); + HBaseKerberosUtils.setKeytabFileForTesting(KEYTAB_FILE.getAbsolutePath()); + HBaseKerberosUtils.setPrincipalForTesting(PRINCIPAL + "@" + KDC.getRealm()); + } + + @AfterClass + public static void tearDown() throws IOException { + if (KDC != null) { + KDC.stop(); + } + TEST_UTIL.cleanupTestDir(); + } + + @Before + public void setUpTest() throws Exception { + krbKeytab = getKeytabFileForTesting(); + krbPrincipal = getPrincipalForTesting(); + ugi = loginKerberosPrincipal(krbKeytab, krbPrincipal); + clientConf = getSecuredConfiguration(); + clientConf.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, getRpcClientClass().getName()); + serverConf = getSecuredConfiguration(); + } + + @Test + public void testRpcCallWithEnabledKerberosSaslAuth() throws Exception { + UserGroupInformation ugi2 = UserGroupInformation.getCurrentUser(); + + // check that the login user is okay: + assertSame(ugi, ugi2); + assertEquals(AuthenticationMethod.KERBEROS, ugi.getAuthenticationMethod()); + assertEquals(krbPrincipal, ugi.getUserName()); + + callRpcService(User.create(ugi2)); + } + + @Test + public void testRpcFallbackToSimpleAuth() throws Exception { + String clientUsername = "testuser"; + UserGroupInformation clientUgi = UserGroupInformation.createUserForTesting(clientUsername, + new String[]{clientUsername}); + + // check that the client user is insecure + assertNotSame(ugi, clientUgi); + assertEquals(AuthenticationMethod.SIMPLE, clientUgi.getAuthenticationMethod()); + assertEquals(clientUsername, clientUgi.getUserName()); + + clientConf.set(User.HBASE_SECURITY_CONF_KEY, "simple"); + serverConf.setBoolean(RpcServer.FALLBACK_TO_INSECURE_CLIENT_AUTH, true); + callRpcService(User.create(clientUgi)); + } + + void setRpcProtection(String clientProtection, String serverProtection) { + clientConf.set("hbase.rpc.protection", clientProtection); + serverConf.set("hbase.rpc.protection", serverProtection); + } + + /** + * Test various qpos of Server and Client. + * @throws Exception + */ + @Test + public void testSaslWithCommonQop() throws Exception { + setRpcProtection("authentication", "authentication"); + callRpcService(User.create(ugi)); + + setRpcProtection("integrity", "integrity"); + callRpcService(User.create(ugi)); + + setRpcProtection("privacy", "privacy"); + callRpcService(User.create(ugi)); + } + + private UserGroupInformation loginKerberosPrincipal(String krbKeytab, String krbPrincipal) + throws Exception { + Configuration cnf = new Configuration(); + cnf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); + UserGroupInformation.setConfiguration(cnf); + UserGroupInformation.loginUserFromKeytab(krbPrincipal, krbKeytab); + return UserGroupInformation.getLoginUser(); + } + + /** + * Sets up a RPC Server and a Client. Does a RPC checks the result. If an exception is thrown + * from the stub, this function will throw root cause of that exception. + */ + private void callRpcService(User clientUser) throws Exception { + SecurityInfo securityInfoMock = Mockito.mock(SecurityInfo.class); + Mockito.when(securityInfoMock.getServerPrincipal()) + .thenReturn(HBaseKerberosUtils.KRB_PRINCIPAL); + SecurityInfo.addInfo("TestProtobufRpcProto", securityInfoMock); + + InetSocketAddress isa = new InetSocketAddress(HOST, 0); + + RpcServerInterface rpcServer = + new RpcServer(null, "AbstractTestSecureIPC", + Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), isa, + serverConf, new FifoRpcScheduler(serverConf, 1)); + rpcServer.start(); + try (RpcClient rpcClient = RpcClientFactory.createClient(clientConf, + HConstants.DEFAULT_CLUSTER_ID.toString())) { + InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } + BlockingRpcChannel channel = + rpcClient.createBlockingRpcChannel( + ServerName.valueOf(address.getHostName(), address.getPort(), + System.currentTimeMillis()), clientUser, 0); + TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub = + TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(channel); + TestThread th1 = new TestThread(stub); + final Throwable exception[] = new Throwable[1]; + Collections.synchronizedList(new ArrayList()); + Thread.UncaughtExceptionHandler exceptionHandler = + new Thread.UncaughtExceptionHandler() { + public void uncaughtException(Thread th, Throwable ex) { + exception[0] = ex; + } + }; + th1.setUncaughtExceptionHandler(exceptionHandler); + th1.start(); + th1.join(); + if (exception[0] != null) { + // throw root cause. + while (exception[0].getCause() != null) { + exception[0] = exception[0].getCause(); + } + throw (Exception) exception[0]; + } + } finally { + rpcServer.stop(); + } + } + + public static class TestThread extends Thread { + private final TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub; + + public TestThread(TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub) { + this.stub = stub; + } + + @Override + public void run() { + try { + int[] messageSize = new int[] {100, 1000, 10000}; + for (int i = 0; i < messageSize.length; i++) { + String input = RandomStringUtils.random(messageSize[i]); + String result = stub.echo(null, TestProtos.EchoRequestProto.newBuilder() + .setMessage(input).build()).getMessage(); + assertEquals(input, result); + } + } catch (ServiceException e) { + throw new RuntimeException(e); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/f51dfe10/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestAsyncSecureIPC.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestAsyncSecureIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestAsyncSecureIPC.java new file mode 100644 index 0000000..2fc270d --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestAsyncSecureIPC.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.security; + +import org.apache.hadoop.hbase.ipc.AsyncRpcClient; +import org.apache.hadoop.hbase.ipc.RpcClient; +import org.apache.hadoop.hbase.testclassification.SecurityTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.experimental.categories.Category; + +@Category({ SecurityTests.class, SmallTests.class }) +public class TestAsyncSecureIPC extends AbstractTestSecureIPC { + + Class getRpcClientClass() { + return AsyncRpcClient.class; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/f51dfe10/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java new file mode 100644 index 0000000..baaa985 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.security; + +import org.apache.hadoop.hbase.ipc.RpcClient; +import org.apache.hadoop.hbase.ipc.RpcClientImpl; +import org.apache.hadoop.hbase.testclassification.SecurityTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.experimental.categories.Category; + +@Category({ SecurityTests.class, SmallTests.class }) +public class TestSecureIPC extends AbstractTestSecureIPC { + + Class getRpcClientClass() { + return RpcClientImpl.class; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/f51dfe10/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java deleted file mode 100644 index a5700d0..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java +++ /dev/null @@ -1,291 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.security; - -import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getKeytabFileForTesting; -import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getPrincipalForTesting; -import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getSecuredConfiguration; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotSame; -import static org.junit.Assert.assertSame; - -import java.io.File; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.ThreadLocalRandom; - -import com.google.protobuf.RpcController; -import com.google.protobuf.ServiceException; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeys; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellScanner; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.ipc.AsyncRpcClient; -import org.apache.hadoop.hbase.ipc.FifoRpcScheduler; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; -import org.apache.hadoop.hbase.ipc.RpcClient; -import org.apache.hadoop.hbase.ipc.RpcClientFactory; -import org.apache.hadoop.hbase.ipc.RpcClientImpl; -import org.apache.hadoop.hbase.ipc.RpcServer; -import org.apache.hadoop.hbase.ipc.RpcServerInterface; -import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos; -import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.minikdc.MiniKdc; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.mockito.Mockito; - -import com.google.common.collect.Lists; -import com.google.protobuf.BlockingRpcChannel; -import com.google.protobuf.BlockingService; - -@Category(SmallTests.class) -public class TestSecureRPC { - - private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - - private static final File KEYTAB_FILE = new File(TEST_UTIL.getDataTestDir("keytab").toUri() - .getPath()); - - static final BlockingService SERVICE = - TestRpcServiceProtos.TestProtobufRpcProto.newReflectiveBlockingService( - new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface() { - - @Override - public TestProtos.EmptyResponseProto ping(RpcController controller, - TestProtos.EmptyRequestProto request) - throws ServiceException { - return null; - } - - @Override - public TestProtos.EmptyResponseProto error(RpcController controller, - TestProtos.EmptyRequestProto request) - throws ServiceException { - return null; - } - - @Override - public TestProtos.EchoResponseProto echo(RpcController controller, - TestProtos.EchoRequestProto request) - throws ServiceException { - if (controller instanceof PayloadCarryingRpcController) { - PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController) controller; - // If cells, scan them to check we are able to iterate what we were given and since - // this is - // an echo, just put them back on the controller creating a new block. Tests our - // block - // building. - CellScanner cellScanner = pcrc.cellScanner(); - List list = null; - if (cellScanner != null) { - list = new ArrayList(); - try { - while (cellScanner.advance()) { - list.add(cellScanner.current()); - } - } catch (IOException e) { - throw new ServiceException(e); - } - } - cellScanner = CellUtil.createCellScanner(list); - ((PayloadCarryingRpcController) controller).setCellScanner(cellScanner); - } - return TestProtos.EchoResponseProto.newBuilder() - .setMessage(request.getMessage()).build(); - } - }); - - private static MiniKdc KDC; - - private static String HOST = "localhost"; - - private static String PRINCIPAL; - - @BeforeClass - public static void setUp() throws Exception { - Properties conf = MiniKdc.createConf(); - conf.put(MiniKdc.DEBUG, true); - KDC = new MiniKdc(conf, new File(TEST_UTIL.getDataTestDir("kdc").toUri().getPath())); - KDC.start(); - PRINCIPAL = "hbase/" + HOST; - KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL); - HBaseKerberosUtils.setKeytabFileForTesting(KEYTAB_FILE.getAbsolutePath()); - HBaseKerberosUtils.setPrincipalForTesting(PRINCIPAL + "@" + KDC.getRealm()); - } - - @AfterClass - public static void tearDown() throws IOException { - if (KDC != null) { - KDC.stop(); - } - TEST_UTIL.cleanupTestDir(); - } - - @Test - public void testRpc() throws Exception { - testRpcCallWithEnabledKerberosSaslAuth(RpcClientImpl.class); - } - - @Test - public void testRpcWithInsecureFallback() throws Exception { - testRpcFallbackToSimpleAuth(RpcClientImpl.class); - } - - @Test - public void testAsyncRpc() throws Exception { - testRpcCallWithEnabledKerberosSaslAuth(AsyncRpcClient.class); - } - - @Test - public void testAsyncRpcWithInsecureFallback() throws Exception { - testRpcFallbackToSimpleAuth(AsyncRpcClient.class); - } - - private void testRpcCallWithEnabledKerberosSaslAuth(Class rpcImplClass) - throws Exception { - String krbKeytab = getKeytabFileForTesting(); - String krbPrincipal = getPrincipalForTesting(); - - UserGroupInformation ugi = loginKerberosPrincipal(krbKeytab, krbPrincipal); - UserGroupInformation ugi2 = UserGroupInformation.getCurrentUser(); - - // check that the login user is okay: - assertSame(ugi, ugi2); - assertEquals(AuthenticationMethod.KERBEROS, ugi.getAuthenticationMethod()); - assertEquals(krbPrincipal, ugi.getUserName()); - - Configuration clientConf = getSecuredConfiguration(); - callRpcService(rpcImplClass, User.create(ugi2), clientConf, false); - } - - private UserGroupInformation loginKerberosPrincipal(String krbKeytab, String krbPrincipal) - throws Exception { - Configuration cnf = new Configuration(); - cnf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); - UserGroupInformation.setConfiguration(cnf); - UserGroupInformation.loginUserFromKeytab(krbPrincipal, krbKeytab); - return UserGroupInformation.getLoginUser(); - } - - private void callRpcService(Class rpcImplClass, User clientUser, - Configuration clientConf, boolean allowInsecureFallback) - throws Exception { - Configuration clientConfCopy = new Configuration(clientConf); - clientConfCopy.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, rpcImplClass.getName()); - - Configuration conf = getSecuredConfiguration(); - conf.setBoolean(RpcServer.FALLBACK_TO_INSECURE_CLIENT_AUTH, allowInsecureFallback); - - SecurityInfo securityInfoMock = Mockito.mock(SecurityInfo.class); - Mockito.when(securityInfoMock.getServerPrincipal()) - .thenReturn(HBaseKerberosUtils.KRB_PRINCIPAL); - SecurityInfo.addInfo("TestProtobufRpcProto", securityInfoMock); - - InetSocketAddress isa = new InetSocketAddress(HOST, 0); - - RpcServerInterface rpcServer = - new RpcServer(null, "AbstractTestSecureIPC", - Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), isa, - conf, new FifoRpcScheduler(conf, 1)); - rpcServer.start(); - try (RpcClient rpcClient = RpcClientFactory.createClient(clientConf, - HConstants.DEFAULT_CLUSTER_ID.toString())) { - InetSocketAddress address = rpcServer.getListenerAddress(); - if (address == null) { - throw new IOException("Listener channel is closed"); - } - BlockingRpcChannel channel = - rpcClient.createBlockingRpcChannel( - - ServerName.valueOf(address.getHostName(), address.getPort(), - System.currentTimeMillis()), clientUser, 5000); - TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub = - TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(channel); - List results = new ArrayList(); - TestThread th1 = new TestThread(stub, results); - th1.start(); - th1.join(); - - } finally { - rpcServer.stop(); - } - } - - public void testRpcFallbackToSimpleAuth(Class rpcImplClass) throws Exception { - String krbKeytab = getKeytabFileForTesting(); - String krbPrincipal = getPrincipalForTesting(); - - UserGroupInformation ugi = loginKerberosPrincipal(krbKeytab, krbPrincipal); - assertEquals(AuthenticationMethod.KERBEROS, ugi.getAuthenticationMethod()); - assertEquals(krbPrincipal, ugi.getUserName()); - - String clientUsername = "testuser"; - UserGroupInformation clientUgi = UserGroupInformation.createUserForTesting(clientUsername, - new String[]{clientUsername}); - - // check that the client user is insecure - assertNotSame(ugi, clientUgi); - assertEquals(AuthenticationMethod.SIMPLE, clientUgi.getAuthenticationMethod()); - assertEquals(clientUsername, clientUgi.getUserName()); - - Configuration clientConf = new Configuration(); - clientConf.set(User.HBASE_SECURITY_CONF_KEY, "simple"); - callRpcService(rpcImplClass, User.create(clientUgi), clientConf, true); - } - - public static class TestThread extends Thread { - private final TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub; - - private final List results; - - public TestThread(TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub, List results) { - this.stub = stub; - this.results = results; - } - - @Override - public void run() { - String result; - try { - result = stub.echo(null, TestProtos.EchoRequestProto.newBuilder().setMessage(String.valueOf( - ThreadLocalRandom.current().nextInt())).build()).getMessage(); - } catch (ServiceException e) { - throw new RuntimeException(e); - } - if (results != null) { - synchronized (results) { - results.add(result); - } - } - } - } -}