Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 8F5EA200C48 for ; Thu, 6 Apr 2017 23:36:04 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 8DFDA160B9F; Thu, 6 Apr 2017 21:36:04 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 8B5D6160B91 for ; Thu, 6 Apr 2017 23:36:02 +0200 (CEST) Received: (qmail 8428 invoked by uid 500); 6 Apr 2017 21:36:01 -0000 Mailing-List: contact commits-help@sentry.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@sentry.apache.org Delivered-To: mailing list commits@sentry.apache.org Received: (qmail 8387 invoked by uid 99); 6 Apr 2017 21:36:00 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 06 Apr 2017 21:36:00 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 97055F21A4; Thu, 6 Apr 2017 21:36:00 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: akolb@apache.org To: commits@sentry.apache.org Date: Thu, 06 Apr 2017 21:36:04 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [5/5] sentry git commit: Client Failover reorg prototype archived-at: Thu, 06 Apr 2017 21:36:04 -0000 Client Failover reorg prototype Project: http://git-wip-us.apache.org/repos/asf/sentry/repo Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/c9c0119f Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/c9c0119f Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/c9c0119f Branch: refs/heads/SENTRY-1593-akolb Commit: c9c0119fc9e61615b9445d989dd63c9395a1b21c Parents: e3d859a Author: Alexander Kolbasov Authored: Thu Apr 6 14:35:46 2017 -0700 Committer: Alexander Kolbasov Committed: Thu Apr 6 14:35:46 2017 -0700 ---------------------------------------------------------------------- .../transport/RetryClientInvocationHandler.java | 22 +- .../SentryClientTransportConfigInterface.java | 2 +- .../common/transport/SentryServiceClient.java | 48 --- ...SentryServiceClientTransportDefaultImpl.java | 342 ------------------- .../core/common/transport/SentrySocket.java | 32 ++ .../transport/SentryTransportFactory.java | 234 +++++++++++++ .../sentry/hdfs/SentryHDFSServiceClient.java | 5 +- .../SentryHDFSServiceClientDefaultImpl.java | 43 ++- .../hdfs/SentryHDFSServiceClientFactory.java | 11 +- .../thrift/SentryGenericServiceClient.java | 5 +- .../SentryGenericServiceClientDefaultImpl.java | 50 ++- .../SentryGenericServiceClientFactory.java | 8 +- .../thrift/SentryPolicyServiceClient.java | 5 +- .../SentryPolicyServiceClientDefaultImpl.java | 46 ++- .../thrift/SentryServiceClientFactory.java | 25 +- .../thrift/SentryServiceClientPoolFactory.java | 27 +- 16 files changed, 428 insertions(+), 477 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/RetryClientInvocationHandler.java ---------------------------------------------------------------------- diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/RetryClientInvocationHandler.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/RetryClientInvocationHandler.java index b01cb37..86569c9 100644 --- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/RetryClientInvocationHandler.java +++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/RetryClientInvocationHandler.java @@ -49,16 +49,20 @@ import java.lang.reflect.Method; * TODO(kalyan) allow multiple client connections using PoolClientInvocationHandler */ -public class RetryClientInvocationHandler extends SentryClientInvocationHandler { +public final class RetryClientInvocationHandler extends SentryClientInvocationHandler { private static final Logger LOGGER = LoggerFactory.getLogger(RetryClientInvocationHandler.class); - private SentryServiceClient client = null; + private final int retries; + private final SentrySocket client; /** * Initialize the sentry configurations, including rpc retry count and client connection * configs for SentryPolicyServiceClientDefaultImpl */ - public RetryClientInvocationHandler(Configuration conf, SentryServiceClient clientObject) { + public RetryClientInvocationHandler(Configuration conf, + SentryClientTransportConfigInterface transportConfig, + SentrySocket clientObject) { + retries = transportConfig.getSentryRpcRetryTotal(conf); Preconditions.checkNotNull(conf, "Configuration object cannot be null"); client = clientObject; } @@ -77,18 +81,17 @@ public class RetryClientInvocationHandler extends SentryClientInvocationHandler synchronized public Object invokeImpl(Object proxy, Method method, Object[] args) throws Exception { int retryCount = 0; Exception lastExc = null; - boolean tryAlternateServer = false; - while (retryCount < client.getRetryCount()) { + while (retryCount < retries) { // Connect to a sentry server if not connected yet. try { - client.connectWithRetry(tryAlternateServer); + client.connect(); } catch (IOException e) { // Increase the retry num // Retry when the exception is caused by connection problem. retryCount++; lastExc = e; - close(); + client.close(); continue; } @@ -108,7 +111,6 @@ public class RetryClientInvocationHandler extends SentryClientInvocationHandler // Retry when the exception is caused by connection problem. lastExc = new TTransportException(sentryTargetException); LOGGER.error("Got TTransportException when do the thrift call ", lastExc); - tryAlternateServer = true; // Closing the thrift client on TTransportException. New client object is // created using new socket when an attempt to reconnect is made. close(); @@ -131,9 +133,9 @@ public class RetryClientInvocationHandler extends SentryClientInvocationHandler } // Throw the exception as reaching the max rpc retry num. - LOGGER.error(String.format("failed after %d retries ", client.getRetryCount()), lastExc); + LOGGER.error(String.format("failed after %d retries ", retries), lastExc); throw new SentryUserException( - String.format("failed after %d retries ", client.getRetryCount()), lastExc); + String.format("failed after %d retries ", retries), lastExc); } @Override http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConfigInterface.java ---------------------------------------------------------------------- diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConfigInterface.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConfigInterface.java index 24192fd..3ea36a1 100644 --- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConfigInterface.java +++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConfigInterface.java @@ -28,7 +28,7 @@ import org.apache.sentry.core.common.exception.MissingConfigurationException; * This Configuration interface should be implemented for all the sentry clients to get * the transport configuration. */ -interface SentryClientTransportConfigInterface { +public interface SentryClientTransportConfigInterface { /** * @param conf configuration * @return number of times client retry logic should iterate through all http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClient.java ---------------------------------------------------------------------- diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClient.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClient.java deleted file mode 100644 index dc93fb7..0000000 --- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClient.java +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sentry.core.common.transport; - -import java.io.Closeable; - -/** - * Client interface for Proxy Invocation handlers - *

- * Defines interface that Sentry client's should expose to the Invocation handlers like - * RetryClientInvocationHandler used to proxy the method invocation on sentry - * client instances . - *

- * All the sentry clients that need retrying and failover capabilities should implement - * this interface. - */ -public interface SentryServiceClient extends Closeable { - /** - * This is a no-op when already connected. - * When there is a connection error, it will retry with another sentry server. It will - * first cycle through all the available sentry servers, and then retry the whole server - * list no more than connectionFullRetryTotal times. In this case, it won't introduce - * more latency when some server fails. Also to prevent all clients connecting to the - * same server, it will reorder the endpoints randomly after a full retry. - *

- * TODO: Add metrics for the number of successful connects and errors per client, and total number of retries. - */ - void connectWithRetry(boolean tryAlternateServer) throws Exception; - - int getRetryCount(); - - void close(); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClientTransportDefaultImpl.java ---------------------------------------------------------------------- diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClientTransportDefaultImpl.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClientTransportDefaultImpl.java deleted file mode 100644 index 4c126fb..0000000 --- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClientTransportDefaultImpl.java +++ /dev/null @@ -1,342 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.sentry.core.common.transport; - -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableMap; -import com.google.common.net.HostAndPort; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.SaslRpcServer; -import org.apache.hadoop.security.SecurityUtil; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.sentry.core.common.exception.MissingConfigurationException; -import org.apache.thrift.transport.TSaslClientTransport; -import org.apache.thrift.transport.TSocket; -import org.apache.thrift.transport.TTransport; -import org.apache.thrift.transport.TTransportException; -import org.apache.sentry.core.common.utils.ThriftUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.security.sasl.Sasl; -import javax.security.sasl.SaslException; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.security.PrivilegedExceptionAction; -import java.util.ArrayList; -import java.util.Collections; - -/** - * Implements the transport functionality for sentry clients. - * All the sentry clients should extend this class for transport implementation. - */ - -public abstract class SentryServiceClientTransportDefaultImpl { - protected final Configuration conf; - protected final boolean kerberos; - private String[] serverPrincipalParts; - - protected TTransport transport; - private final int connectionTimeout; - private static final Logger LOGGER = LoggerFactory.getLogger(SentryServiceClientTransportDefaultImpl.class); - // configs for connection retry - private final int connectionFullRetryTotal; - private final int rpcRetryTotal; - private final ArrayList endpoints; - protected InetSocketAddress serverAddress; - private final SentryClientTransportConfigInterface transportConfig; - private static final ImmutableMap SASL_PROPERTIES = - ImmutableMap.of(Sasl.SERVER_AUTH, "true", Sasl.QOP, "auth-conf"); - - /** - * Defines various client types. - */ - protected enum sentryClientType { - POLICY_CLIENT, - HDFS_CLIENT, - } - - /** - * This transport wraps the Sasl transports to set up the right UGI context for open(). - */ - public static class UgiSaslClientTransport extends TSaslClientTransport { - UserGroupInformation ugi = null; - - public UgiSaslClientTransport(String mechanism, String protocol, - String serverName, TTransport transport, - boolean wrapUgi, Configuration conf) - throws IOException, SaslException { - super(mechanism, null, protocol, serverName, SASL_PROPERTIES, null, - transport); - if (wrapUgi) { - // If we don't set the configuration, the UGI will be created based on - // what's on the classpath, which may lack the kerberos changes we require - UserGroupInformation.setConfiguration(conf); - ugi = UserGroupInformation.getLoginUser(); - } - } - - // open the SASL transport with using the current UserGroupInformation - // This is needed to get the current login context stored - @Override - public void open() throws TTransportException { - if (ugi == null) { - baseOpen(); - } else { - try { - if (ugi.isFromKeytab()) { - ugi.checkTGTAndReloginFromKeytab(); - } - ugi.doAs(new PrivilegedExceptionAction() { - public Void run() throws TTransportException { - baseOpen(); - return null; - } - }); - } catch (IOException e) { - throw new TTransportException("Failed to open SASL transport: " + e.getMessage(), e); - } catch (InterruptedException e) { - throw new TTransportException( - "Interrupted while opening underlying transport: " + e.getMessage(), e); - } - } - } - - private void baseOpen() throws TTransportException { - super.open(); - } - } - - /** - * Initialize the object based on the sentry configuration provided. - * List of configured servers are reordered randomly preventing all - * clients connecting to the same server. - * - * @param conf Sentry configuration - * @param type Type indicates the service type - */ - public SentryServiceClientTransportDefaultImpl(Configuration conf, - sentryClientType type) throws IOException { - - this.conf = conf; - Preconditions.checkNotNull(this.conf, "Configuration object cannot be null"); - serverPrincipalParts = null; - if (type == sentryClientType.POLICY_CLIENT) { - transportConfig = new SentryPolicyClientTransportConfig(); - } else { - transportConfig = new SentryHDFSClientTransportConfig(); - } - - try { - String hostsAndPortsStr; - this.connectionTimeout = transportConfig.getServerRpcConnTimeoutInMs(conf); - this.connectionFullRetryTotal = transportConfig.getSentryFullRetryTotal(conf); - this.rpcRetryTotal = transportConfig.getSentryRpcRetryTotal(conf); - this.kerberos = transportConfig.isKerberosEnabled(conf); - - hostsAndPortsStr = transportConfig.getSentryServerRpcAddress(conf); - - int serverPort = transportConfig.getServerRpcPort(conf); - - String[] hostsAndPortsStrArr = hostsAndPortsStr.split(","); - HostAndPort[] hostsAndPorts = ThriftUtil.parseHostPortStrings(hostsAndPortsStrArr, serverPort); - - this.endpoints = new ArrayList(hostsAndPortsStrArr.length); - for (HostAndPort endpoint : hostsAndPorts) { - this.endpoints.add( - new InetSocketAddress(endpoint.getHostText(), endpoint.getPort())); - LOGGER.debug("Added server endpoint: " + endpoint.toString()); - } - - // Reorder endpoints randomly to prevent all clients connecting to the same endpoint - // at the same time after a node failure. - Collections.shuffle(endpoints); - serverAddress = null; - connectWithRetry(false); - } catch (Exception e) { - throw new RuntimeException("Client Creation Failed: " + e.getMessage(), e); - } - } - - /** - * Initialize object based on the parameters provided provided. - * - * @param addr Host address which the client needs to connect - * @param port Host Port which the client needs to connect - * @param conf Sentry configuration - * @param type Type indicates the service type - */ - public SentryServiceClientTransportDefaultImpl(String addr, int port, Configuration conf, - sentryClientType type) throws IOException { - // copy the configuration because we may make modifications to it. - this.conf = new Configuration(conf); - serverPrincipalParts = null; - Preconditions.checkNotNull(this.conf, "Configuration object cannot be null"); - if (type == sentryClientType.POLICY_CLIENT) { - transportConfig = new SentryPolicyClientTransportConfig(); - } else { - transportConfig = new SentryHDFSClientTransportConfig(); - } - - try { - InetSocketAddress serverAddress = NetUtils.createSocketAddr(addr, port); - this.connectionTimeout = transportConfig.getServerRpcConnTimeoutInMs(conf); - this.rpcRetryTotal = transportConfig.getSentryRpcRetryTotal(conf); - this.connectionFullRetryTotal = transportConfig.getSentryFullRetryTotal(conf); - this.kerberos = transportConfig.isKerberosEnabled(conf); - connect(serverAddress); - } catch (MissingConfigurationException e) { - throw new RuntimeException("Client Creation Failed: " + e.getMessage(), e); - } - endpoints = null; - } - - - /** - * no-op when already connected. - * On connection error, Iterates through all the configured servers and tries to connect. - * On successful connection, control returns - * On connection failure, continues iterating through all the configured sentry servers, - * and then retries the whole server list no more than connectionFullRetryTotal times. - * In this case, it won't introduce more latency when some server fails. - *

- * TODO: Add metrics for the number of successful connects and errors per client, and total number of retries. - */ - public synchronized void connectWithRetry(boolean tryAlternateServer) throws IOException { - if (isConnected() && (!tryAlternateServer)) { - return; - } - - IOException currentException = null; - for (int retryCount = 0; retryCount < connectionFullRetryTotal; retryCount++) { - try { - connectToAvailableServer(); - return; - } catch (IOException e) { - currentException = e; - LOGGER.error( - String.format("Failed to connect to all the configured sentry servers, " + - "Retrying again")); - } - } - // Throw exception as reaching the max full connectWithRetry number. - LOGGER.error( - String.format("Reach the max connection retry num %d ", connectionFullRetryTotal), - currentException); - throw currentException; - } - - /** - * Iterates through all the configured servers and tries to connect. - * On connection error, tries to connect to next server. - * Control returns on successful connection OR it's done trying to all the - * configured servers. - * - * @throws IOException - */ - private void connectToAvailableServer() throws IOException { - IOException currentException = null; - if (endpoints.size() == 1) { - connect(endpoints.get(0)); - return; - } - - for (InetSocketAddress addr : endpoints) { - try { - serverAddress = addr; - connect(serverAddress); - LOGGER.info(String.format("Connected to SentryServer: %s", addr.toString())); - return; - } catch (IOException e) { - LOGGER.error(String.format("Failed connection to %s: %s", - addr.toString(), e.getMessage()), e); - currentException = e; - } - } - throw currentException; - } - - /** - * Connect to the specified socket address and throw IOException if failed. - * - * @param serverAddress Address client needs to connect - * @throws Exception if there is failure in establishing the connection. - */ - protected void connect(InetSocketAddress serverAddress) throws IOException { - try { - transport = createTransport(serverAddress); - transport.open(); - } catch (TTransportException e) { - throw new IOException("Failed to open transport: " + e.getMessage(), e); - } catch (MissingConfigurationException e) { - throw new RuntimeException(e.getMessage(), e); - } - - LOGGER.debug("Successfully opened transport: " + transport + " to " + serverAddress); - } - - /** - * New socket is is created - * - * @param serverAddress - * @return - * @throws TTransportException - * @throws MissingConfigurationException - * @throws IOException - */ - private TTransport createTransport(InetSocketAddress serverAddress) - throws TTransportException, MissingConfigurationException, IOException { - TTransport socket = new TSocket(serverAddress.getHostName(), - serverAddress.getPort(), connectionTimeout); - - if (kerberos) { - String serverPrincipal = transportConfig.getSentryPrincipal(conf); - serverPrincipal = SecurityUtil.getServerPrincipal(serverPrincipal, serverAddress.getAddress()); - LOGGER.debug("Using server kerberos principal: " + serverPrincipal); - if (serverPrincipalParts == null) { - serverPrincipalParts = SaslRpcServer.splitKerberosName(serverPrincipal); - Preconditions.checkArgument(serverPrincipalParts.length == 3, - "Kerberos principal should have 3 parts: " + serverPrincipal); - } - - boolean wrapUgi = transportConfig.useUserGroupInformation(conf); - return new UgiSaslClientTransport(SaslRpcServer.AuthMethod.KERBEROS.getMechanismName(), - serverPrincipalParts[0], serverPrincipalParts[1], - socket, wrapUgi, conf); - } else { - return socket; - } - } - - private boolean isConnected() { - return transport != null && transport.isOpen(); - } - - public synchronized void close() { - if (isConnected()) { - transport.close(); - } - } - - public int getRetryCount() { - return rpcRetryTotal; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentrySocket.java ---------------------------------------------------------------------- diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentrySocket.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentrySocket.java new file mode 100644 index 0000000..3374489 --- /dev/null +++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentrySocket.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sentry.core.common.transport; + +import java.io.IOException; + +/** + * General representation of transport connection to Sentry + */ +public interface SentrySocket extends AutoCloseable { + /** + * Connect to the Sentry server + * @throws IOException + */ + void connect() throws IOException; +} http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportFactory.java ---------------------------------------------------------------------- diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportFactory.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportFactory.java new file mode 100644 index 0000000..74ac92d --- /dev/null +++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportFactory.java @@ -0,0 +1,234 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sentry.core.common.transport; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import com.google.common.net.HostAndPort; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.SaslRpcServer; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.sentry.core.common.exception.MissingConfigurationException; +import org.apache.sentry.core.common.utils.ThriftUtil; +import org.apache.thrift.transport.TSaslClientTransport; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.security.sasl.Sasl; +import javax.security.sasl.SaslException; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Collections; + +/** + * Generate Thrift transports suitable for talking to Sentry + */ +public final class SentryTransportFactory { + private static final Logger LOGGER = LoggerFactory.getLogger(SentryTransportFactory.class); + + private final Configuration conf; + private final SentryClientTransportConfigInterface transportConfig; + private final ArrayList endpoints; + + public SentryTransportFactory(Configuration conf, + SentryClientTransportConfigInterface configInterface) { + this.conf = conf; + this.transportConfig = configInterface; + String hostsAndPortsStr = transportConfig.getSentryServerRpcAddress(conf); + int serverPort = transportConfig.getServerRpcPort(conf); + + String[] hostsAndPortsStrArr = hostsAndPortsStr.split(","); + HostAndPort[] hostsAndPorts = ThriftUtil.parseHostPortStrings(hostsAndPortsStrArr, serverPort); + this.endpoints = new ArrayList<>(hostsAndPortsStrArr.length); + for (HostAndPort endpoint : hostsAndPorts) { + this.endpoints.add( + new InetSocketAddress(endpoint.getHostText(), endpoint.getPort())); + LOGGER.debug("Added server endpoint: " + endpoint.toString()); + } + // Reorder endpoints randomly to prevent all clients connecting to the same endpoint + // at the same time after a node failure. + if (endpoints.size() > 1) { + Collections.shuffle(endpoints); + } + } + + /** + * This transport wraps the Sasl transports to set up the right UGI context for open(). + */ + private static final class UgiSaslClientTransport extends TSaslClientTransport { + private static final ImmutableMap SASL_PROPERTIES = + ImmutableMap.of(Sasl.SERVER_AUTH, "true", Sasl.QOP, "auth-conf"); + + private UserGroupInformation ugi = null; + + private UgiSaslClientTransport(String mechanism, String protocol, + String serverName, TTransport transport, + boolean wrapUgi, Configuration conf) + throws IOException, SaslException { + super(mechanism, null, protocol, serverName, SASL_PROPERTIES, null, + transport); + if (wrapUgi) { + // If we don't set the configuration, the UGI will be created based on + // what's on the classpath, which may lack the kerberos changes we require + UserGroupInformation.setConfiguration(conf); + ugi = UserGroupInformation.getLoginUser(); + } + } + + // open the SASL transport with using the current UserGroupInformation + // This is needed to get the current login context stored + @Override + public void open() throws TTransportException { + if (ugi == null) { + baseOpen(); + } else { + try { + if (ugi.isFromKeytab()) { + ugi.checkTGTAndReloginFromKeytab(); + } + ugi.doAs(new PrivilegedExceptionAction() { + public Void run() throws TTransportException { + baseOpen(); + return null; + } + }); + } catch (IOException e) { + throw new TTransportException("Failed to open SASL transport: " + e.getMessage(), e); + } catch (InterruptedException e) { + throw new TTransportException( + "Interrupted while opening underlying transport: " + e.getMessage(), e); + } + } + } + + private void baseOpen() throws TTransportException { + super.open(); + } + } + + /** + * On connection error, Iterates through all the configured servers and tries to connect. + * On successful connection, control returns + * On connection failure, continues iterating through all the configured sentry servers, + * and then retries the whole server list no more than connectionFullRetryTotal times. + * In this case, it won't introduce more latency when some server fails. + *

+ * TODO: Add metrics for the number of successful connects and errors per client, and total number of retries. + */ + public TTransport connect() throws IOException { + int connectionFullRetryTotal = transportConfig.getSentryFullRetryTotal(conf); + IOException currentException = null; + for (int retryCount = 0; retryCount < connectionFullRetryTotal; retryCount++) { + try { + return connectToAvailableServer(); + } catch (IOException e) { + currentException = e; + LOGGER.error( + String.format("Failed to connect to all the configured sentry servers, " + + "Retrying again")); + } + } + // Throw exception as reaching the max full connectWithRetry number. + LOGGER.error( + String.format("Reach the max connection retry num %d ", connectionFullRetryTotal), + currentException); + throw currentException; + } + + /** + * Iterates through all the configured servers and tries to connect. + * On connection error, tries to connect to next server. + * Control returns on successful connection OR it's done trying to all the + * configured servers. + * + * @throws IOException + */ + private TTransport connectToAvailableServer() throws IOException { + IOException currentException = null; + for (InetSocketAddress addr : endpoints) { + try { + return connect(addr); + } catch (IOException e) { + LOGGER.error(String.format("Failed connection to %s: %s", + addr.toString(), e.getMessage()), e); + currentException = e; + } + } + if (currentException != null) { + throw currentException; + } + return null; + } + + /** + * Connect to the specified socket address and throw IOException if failed. + * + * @param serverAddress Address client needs to connect + * @throws Exception if there is failure in establishing the connection. + */ + protected TTransport connect(InetSocketAddress serverAddress) throws IOException { + try { + TTransport transport = createTransport(serverAddress); + transport.open(); + LOGGER.info(String.format("Connected to SentryServer: %s", serverAddress)); + return transport; + } catch (TTransportException e) { + throw new IOException("Failed to open transport: " + e.getMessage(), e); + } catch (MissingConfigurationException e) { + throw new RuntimeException(e.getMessage(), e); + } + } + + /** + * New socket is is created + * + * @param serverAddress + * @return + * @throws TTransportException + * @throws MissingConfigurationException + * @throws IOException + */ + private TTransport createTransport(InetSocketAddress serverAddress) + throws TTransportException, MissingConfigurationException, IOException { + TTransport socket = new TSocket(serverAddress.getHostName(), + serverAddress.getPort(), transportConfig.getServerRpcConnTimeoutInMs(conf)); + + if (!transportConfig.isKerberosEnabled(conf)) { + return socket; + } + + String serverPrincipal = transportConfig.getSentryPrincipal(conf); + serverPrincipal = SecurityUtil.getServerPrincipal(serverPrincipal, serverAddress.getAddress()); + LOGGER.debug("Using server kerberos principal: " + serverPrincipal); + String[] serverPrincipalParts = SaslRpcServer.splitKerberosName(serverPrincipal); + Preconditions.checkArgument(serverPrincipalParts.length == 3, + "Kerberos principal should have 3 parts: " + serverPrincipal); + + boolean wrapUgi = transportConfig.useUserGroupInformation(conf); + return new UgiSaslClientTransport(SaslRpcServer.AuthMethod.KERBEROS.getMechanismName(), + serverPrincipalParts[0], serverPrincipalParts[1], + socket, wrapUgi, conf); + } +} http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java index faac053..11f6894 100644 --- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java +++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java @@ -18,9 +18,8 @@ package org.apache.sentry.hdfs; import org.apache.sentry.core.common.exception.SentryHdfsServiceException; -import org.apache.sentry.core.common.transport.SentryServiceClient; -public interface SentryHDFSServiceClient extends SentryServiceClient { +public interface SentryHDFSServiceClient { String SENTRY_HDFS_SERVICE_NAME = "SentryHDFSService"; void notifyHMSUpdate(PathsUpdate update) @@ -30,5 +29,7 @@ public interface SentryHDFSServiceClient extends SentryServiceClient { SentryAuthzUpdate getAllUpdatesFrom(long permSeqNum, long pathSeqNum) throws SentryHdfsServiceException; + + void close(); } http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java index d337319..794aded 100644 --- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java +++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java @@ -18,12 +18,13 @@ package org.apache.sentry.hdfs; import java.io.IOException; -import java.net.InetSocketAddress; import java.util.LinkedList; import org.apache.hadoop.conf.Configuration; import org.apache.sentry.core.common.exception.SentryHdfsServiceException; -import org.apache.sentry.core.common.transport.SentryServiceClientTransportDefaultImpl; +import org.apache.sentry.core.common.transport.SentryClientTransportConfigInterface; +import org.apache.sentry.core.common.transport.SentrySocket; +import org.apache.sentry.core.common.transport.SentryTransportFactory; import org.apache.sentry.hdfs.service.thrift.SentryHDFSService; import org.apache.sentry.hdfs.service.thrift.SentryHDFSService.Client; import org.apache.sentry.hdfs.service.thrift.TAuthzUpdateResponse; @@ -34,6 +35,7 @@ import org.apache.thrift.protocol.TCompactProtocol; import org.apache.thrift.protocol.TMultiplexedProtocol; import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.TTransport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,28 +49,41 @@ import org.slf4j.LoggerFactory; */ -public class SentryHDFSServiceClientDefaultImpl extends SentryServiceClientTransportDefaultImpl implements SentryHDFSServiceClient { +public class SentryHDFSServiceClientDefaultImpl + implements SentryHDFSServiceClient, SentrySocket { private static final Logger LOGGER = LoggerFactory.getLogger(SentryHDFSServiceClientDefaultImpl.class); + private final Configuration conf; private Client client; + private SentryTransportFactory transportFactory; + private TTransport transport; - public SentryHDFSServiceClientDefaultImpl(Configuration conf) throws IOException { - super(conf, sentryClientType.HDFS_CLIENT); + + + SentryHDFSServiceClientDefaultImpl(Configuration conf, + SentryClientTransportConfigInterface transportConfig) + throws IOException { + this.conf = conf; + transportFactory = new SentryTransportFactory(conf, transportConfig); } /** * Connect to the specified socket address and then use the new socket * to construct new thrift client. * - * @param serverAddress: socket address to which the client should connect. * @throws IOException */ - public void connect(InetSocketAddress serverAddress) throws IOException { - TProtocol tProtocol = null; - super.connect(serverAddress); + @Override + public void connect() throws IOException { + if (isOpen()) { + return; + } + + transport = transportFactory.connect(); long maxMessageSize = conf.getLong(ServiceConstants.ClientConfig.SENTRY_HDFS_THRIFT_MAX_MESSAGE_SIZE, ServiceConstants.ClientConfig.SENTRY_HDFS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT); + TProtocol tProtocol = null; if (conf.getBoolean(ServiceConstants.ClientConfig.USE_COMPACT_TRANSPORT, ServiceConstants.ClientConfig.USE_COMPACT_TRANSPORT_DEFAULT)) { tProtocol = new TCompactProtocol(transport, maxMessageSize, maxMessageSize); @@ -119,4 +134,14 @@ public class SentryHDFSServiceClientDefaultImpl extends SentryServiceClientTrans } return retVal; } + + @Override + public void close() { + transport.close(); + transport = null; + } + + private boolean isOpen() { + return ((transport != null) && transport.isOpen()); + } } http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientFactory.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientFactory.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientFactory.java index 59ac360..174da4f 100644 --- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientFactory.java +++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientFactory.java @@ -21,12 +21,16 @@ import java.lang.reflect.Proxy; import org.apache.hadoop.conf.Configuration; import org.apache.sentry.core.common.transport.RetryClientInvocationHandler; +import org.apache.sentry.core.common.transport.SentryClientTransportConfigInterface; +import org.apache.sentry.core.common.transport.SentryHDFSClientTransportConfig; /** * Client factory to create normal client or proxy with HA invocation handler */ public class SentryHDFSServiceClientFactory { - + private static final SentryClientTransportConfigInterface transportConfig = + new SentryHDFSClientTransportConfig(); + private SentryHDFSServiceClientFactory() { // Make constructor private to avoid instantiation } @@ -36,7 +40,8 @@ public class SentryHDFSServiceClientFactory { return (SentryHDFSServiceClient) Proxy .newProxyInstance(SentryHDFSServiceClientDefaultImpl.class.getClassLoader(), SentryHDFSServiceClientDefaultImpl.class.getInterfaces(), - new RetryClientInvocationHandler(conf, - new SentryHDFSServiceClientDefaultImpl(conf))); + new RetryClientInvocationHandler(conf, transportConfig, + new SentryHDFSServiceClientDefaultImpl(conf, + transportConfig))); } } http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClient.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClient.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClient.java index c832706..11cdee7 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClient.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClient.java @@ -24,9 +24,8 @@ import java.util.Set; import org.apache.sentry.core.common.exception.SentryUserException; import org.apache.sentry.core.common.ActiveRoleSet; import org.apache.sentry.core.common.Authorizable; -import org.apache.sentry.core.common.transport.SentryServiceClient; -public interface SentryGenericServiceClient extends SentryServiceClient { +public interface SentryGenericServiceClient { /** * Create a sentry role @@ -192,4 +191,6 @@ public interface SentryGenericServiceClient extends SentryServiceClient { Map listPrivilegsbyAuthorizable(String component, String serviceName, String requestorUserName, Set authorizablesSet, Set groups, ActiveRoleSet roleSet) throws SentryUserException; + + void close(); } http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientDefaultImpl.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientDefaultImpl.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientDefaultImpl.java index 9bbd736..c9d0357 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientDefaultImpl.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientDefaultImpl.java @@ -18,18 +18,16 @@ package org.apache.sentry.provider.db.generic.service.thrift; import java.io.IOException; -import java.net.InetSocketAddress; import java.util.*; import org.apache.hadoop.conf.Configuration; -import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION; - import org.apache.sentry.core.common.exception.SentryUserException; import org.apache.sentry.core.common.ActiveRoleSet; import org.apache.sentry.core.common.Authorizable; -import org.apache.sentry.core.common.transport.SentryServiceClientTransportDefaultImpl; -import org.apache.sentry.core.common.utils.SentryConstants; +import org.apache.sentry.core.common.transport.SentryClientTransportConfigInterface; +import org.apache.sentry.core.common.transport.SentrySocket; +import org.apache.sentry.core.common.transport.SentryTransportFactory; import org.apache.sentry.core.model.db.AccessConstants; import org.apache.sentry.service.thrift.ServiceConstants; import org.apache.sentry.service.thrift.Status; @@ -38,6 +36,7 @@ import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TMultiplexedProtocol; +import org.apache.thrift.transport.TTransport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,30 +51,48 @@ import com.google.common.collect.Lists; So it is important to close and re-open the transport so that new socket is used. */ -public class SentryGenericServiceClientDefaultImpl extends SentryServiceClientTransportDefaultImpl implements SentryGenericServiceClient { +public class SentryGenericServiceClientDefaultImpl + implements SentryGenericServiceClient, SentrySocket { + private final SentryTransportFactory transportFactory; + private final Configuration conf; + private TTransport transport; + + private SentryGenericPolicyService.Client client; private static final Logger LOGGER = LoggerFactory .getLogger(SentryGenericServiceClientDefaultImpl.class); private static final String THRIFT_EXCEPTION_MESSAGE = "Thrift exception occured "; - public SentryGenericServiceClientDefaultImpl(Configuration conf) throws IOException { - super(conf, sentryClientType.POLICY_CLIENT); + public SentryGenericServiceClientDefaultImpl(Configuration conf, + SentryClientTransportConfigInterface transportConfig) + throws IOException { + this.conf = conf; + transportFactory = new SentryTransportFactory(conf, transportConfig); + + // TODO - do it correctly + /* if (kerberos) { // since the client uses hadoop-auth, we need to set kerberos in // hadoop-auth if we plan to use kerberos conf.set(HADOOP_SECURITY_AUTHENTICATION, SentryConstants.KERBEROS_MODE); } + */ } /** * Connect to the specified socket address and then use the new socket * to construct new thrift client. * - * @param serverAddress: socket address to which the client should connect. * @throws IOException */ - public void connect(InetSocketAddress serverAddress) throws IOException { - super.connect(serverAddress); + @Override + public void connect() throws IOException { + if (isOpen()) { + return; + } + + transport = transportFactory.connect(); + long maxMessageSize = conf.getLong(ServiceConstants.ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE, ServiceConstants.ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE_DEFAULT); TMultiplexedProtocol protocol = new TMultiplexedProtocol( @@ -84,6 +101,12 @@ public class SentryGenericServiceClientDefaultImpl extends SentryServiceClientTr client = new SentryGenericPolicyService.Client(protocol); LOGGER.debug("Successfully created client"); } + + private boolean isOpen() { + return ((transport != null) && transport.isOpen()); + } + + /** * Create a sentry role * @@ -506,4 +529,9 @@ public class SentryGenericServiceClientDefaultImpl extends SentryServiceClientTr throw new SentryUserException(THRIFT_EXCEPTION_MESSAGE, e); } } + + @Override + public void close() { + transport.close(); + } } http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientFactory.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientFactory.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientFactory.java index 1c582f0..9132449 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientFactory.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientFactory.java @@ -19,6 +19,8 @@ package org.apache.sentry.provider.db.generic.service.thrift; import org.apache.hadoop.conf.Configuration; import org.apache.sentry.core.common.transport.RetryClientInvocationHandler; +import org.apache.sentry.core.common.transport.SentryClientTransportConfigInterface; +import org.apache.sentry.core.common.transport.SentryPolicyClientTransportConfig; import java.lang.reflect.Proxy; @@ -26,6 +28,8 @@ import java.lang.reflect.Proxy; * SentryGenericServiceClientFactory is a public class for the components which using Generic Model to create sentry client. */ public final class SentryGenericServiceClientFactory { + private static final SentryClientTransportConfigInterface transportConfig = + new SentryPolicyClientTransportConfig(); private SentryGenericServiceClientFactory() { } @@ -34,8 +38,8 @@ public final class SentryGenericServiceClientFactory { return (SentryGenericServiceClient) Proxy .newProxyInstance(SentryGenericServiceClientDefaultImpl.class.getClassLoader(), SentryGenericServiceClientDefaultImpl.class.getInterfaces(), - new RetryClientInvocationHandler(conf, - new SentryGenericServiceClientDefaultImpl(conf))); + new RetryClientInvocationHandler(conf, transportConfig, + new SentryGenericServiceClientDefaultImpl(conf, transportConfig))); } } http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClient.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClient.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClient.java index 28c3e35..3b25db7 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClient.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClient.java @@ -25,9 +25,8 @@ import java.util.Set; import org.apache.sentry.core.common.exception.SentryUserException; import org.apache.sentry.core.common.ActiveRoleSet; import org.apache.sentry.core.common.Authorizable; -import org.apache.sentry.core.common.transport.SentryServiceClient; -public interface SentryPolicyServiceClient extends SentryServiceClient { +public interface SentryPolicyServiceClient { void createRole(String requestorUserName, String roleName) throws SentryUserException; @@ -216,4 +215,6 @@ public interface SentryPolicyServiceClient extends SentryServiceClient { // export the sentry mapping data with map structure Map>> exportPolicy(String requestorUserName, String objectPath) throws SentryUserException; + + void close(); } http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java index b4c1a5f..9eb60cc 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java @@ -19,7 +19,6 @@ package org.apache.sentry.provider.db.service.thrift; import java.io.IOException; -import java.net.InetSocketAddress; import java.util.List; import java.util.Map; import java.util.Set; @@ -31,6 +30,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.sentry.core.common.exception.SentryUserException; import org.apache.sentry.core.common.ActiveRoleSet; import org.apache.sentry.core.common.Authorizable; +import org.apache.sentry.core.common.transport.SentryClientTransportConfigInterface; +import org.apache.sentry.core.common.transport.SentrySocket; +import org.apache.sentry.core.common.transport.SentryTransportFactory; import org.apache.sentry.core.model.db.AccessConstants; import org.apache.sentry.core.model.db.DBModelAuthorizable; import org.apache.sentry.core.common.utils.PolicyFileConstants; @@ -42,7 +44,7 @@ import org.apache.sentry.service.thrift.Status; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TMultiplexedProtocol; -import org.apache.sentry.core.common.transport.SentryServiceClientTransportDefaultImpl; +import org.apache.thrift.transport.TTransport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,35 +67,43 @@ import com.google.common.collect.Sets; server this is configured. */ -public class SentryPolicyServiceClientDefaultImpl extends SentryServiceClientTransportDefaultImpl implements SentryPolicyServiceClient { +public class SentryPolicyServiceClientDefaultImpl + implements SentryPolicyServiceClient, SentrySocket { + + private final Configuration conf; private SentryPolicyService.Client client; private static final Logger LOGGER = LoggerFactory .getLogger(SentryPolicyServiceClient.class); + private SentryTransportFactory transportFactory; + private TTransport transport; + + private static final String THRIFT_EXCEPTION_MESSAGE = "Thrift exception occurred "; /** * Initialize the sentry configurations. */ - public SentryPolicyServiceClientDefaultImpl(Configuration conf) + public SentryPolicyServiceClientDefaultImpl(Configuration conf, + SentryClientTransportConfigInterface transportConfig) throws IOException { - super(conf, sentryClientType.POLICY_CLIENT); - } - - public SentryPolicyServiceClientDefaultImpl(String addr, int port, - Configuration conf) throws IOException { - super(addr, port, conf, sentryClientType.POLICY_CLIENT); + this.conf = conf; + this.transportFactory = new SentryTransportFactory(conf, transportConfig); } /** * Connect to the specified socket address and then use the new socket * to construct new thrift client. * - * @param serverAddress: socket address to which the client should connect. * @throws IOException */ - public void connect(InetSocketAddress serverAddress) throws IOException { - super.connect(serverAddress); + @Override + public void connect() throws IOException { + if (isOpen()) { + return; + } + transport = transportFactory.connect(); + long maxMessageSize = conf.getLong( ServiceConstants.ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE, ServiceConstants.ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE_DEFAULT); @@ -1008,4 +1018,14 @@ public class SentryPolicyServiceClientDefaultImpl extends SentryServiceClientTra } return rolePrivilegesMapForFile; } + + @Override + public void close() { + transport.close(); + transport = null; + } + + private boolean isOpen() { + return ((transport != null) && transport.isOpen()); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java index 745dc4c..55c51d3 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java @@ -23,29 +23,24 @@ import java.lang.reflect.Proxy; import org.apache.hadoop.conf.Configuration; import org.apache.sentry.core.common.transport.RetryClientInvocationHandler; +import org.apache.sentry.core.common.transport.SentryClientTransportConfigInterface; +import org.apache.sentry.core.common.transport.SentryPolicyClientTransportConfig; import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClient; import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClientDefaultImpl; -import org.apache.sentry.service.thrift.ServiceConstants.ClientConfig; public final class SentryServiceClientFactory { + private static final SentryClientTransportConfigInterface transportConfig = + new SentryPolicyClientTransportConfig(); + private SentryServiceClientFactory() { } public static SentryPolicyServiceClient create(Configuration conf) throws Exception { - boolean pooled = conf.getBoolean( - ClientConfig.SENTRY_POOL_ENABLED, ClientConfig.SENTRY_POOL_ENABLED_DEFAULT); - if (pooled) { - return (SentryPolicyServiceClient) Proxy - .newProxyInstance(SentryPolicyServiceClientDefaultImpl.class.getClassLoader(), - SentryPolicyServiceClientDefaultImpl.class.getInterfaces(), - new PoolClientInvocationHandler(conf)); - } else { - return (SentryPolicyServiceClient) Proxy - .newProxyInstance(SentryPolicyServiceClientDefaultImpl.class.getClassLoader(), - SentryPolicyServiceClientDefaultImpl.class.getInterfaces(), - new RetryClientInvocationHandler(conf, - new SentryPolicyServiceClientDefaultImpl(conf))); - } + return (SentryPolicyServiceClient) Proxy + .newProxyInstance(SentryPolicyServiceClientDefaultImpl.class.getClassLoader(), + SentryPolicyServiceClientDefaultImpl.class.getInterfaces(), + new RetryClientInvocationHandler(conf, transportConfig, + new SentryPolicyServiceClientDefaultImpl(conf, transportConfig))); } } http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientPoolFactory.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientPoolFactory.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientPoolFactory.java index 0164fa6..dd13e0d 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientPoolFactory.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientPoolFactory.java @@ -23,9 +23,9 @@ import org.apache.commons.pool2.PooledObject; import org.apache.commons.pool2.impl.DefaultPooledObject; import org.apache.hadoop.conf.Configuration; import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClient; -import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClientDefaultImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import sun.reflect.generics.reflectiveObjects.NotImplementedException; /** * SentryServiceClientPoolFactory is for connection pool to manage the object. Implement the related @@ -36,21 +36,21 @@ public class SentryServiceClientPoolFactory extends BasePooledObjectFactory pooledObject) { - SentryPolicyServiceClient client = pooledObject.getObject(); - LOGGER.debug("Destroying Sentry Service Client: " + client); - if (client != null) { - // The close() of TSocket or TSaslClientTransport is called actually, and there has no - // exception even there has some problems, eg, the client is closed already. - // The close here is just try to close the socket and the client will be destroyed soon. - client.close(); - } + throw new NotImplementedException(); } }