sentry-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l...@apache.org
Subject sentry git commit: SENTRY-1477: Sentry clients should retry with another server when they get connection errors (Li Li, reviewed by Alexander Kolbasov, Hao Hao)
Date Wed, 09 Nov 2016 19:08:33 GMT
Repository: sentry
Updated Branches:
  refs/heads/sentry-ha-redesign 97c9f7398 -> d8425a8d8


SENTRY-1477: Sentry clients should retry with another server when they get connection errors
(Li Li, reviewed by Alexander Kolbasov, Hao Hao)


Project: http://git-wip-us.apache.org/repos/asf/sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/d8425a8d
Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/d8425a8d
Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/d8425a8d

Branch: refs/heads/sentry-ha-redesign
Commit: d8425a8d89ad7b9a8d9e597322b456909d60cfa8
Parents: 97c9f73
Author: Li Li <lili@apache.org>
Authored: Tue Nov 8 21:11:13 2016 -0800
Committer: Li Li <lili@apache.org>
Committed: Tue Nov 8 21:11:13 2016 -0800

----------------------------------------------------------------------
 .../SentryPolicyServiceClientDefaultImpl.java   | 124 +++++++++++++---
 .../provider/db/service/thrift/ThriftUtil.java  |  15 ++
 .../thrift/PoolClientInvocationHandler.java     |  87 +++---------
 .../thrift/RetryClientInvocationHandler.java    | 142 +++++++++++++++++++
 .../thrift/SentryClientInvocationHandler.java   |   2 +-
 .../thrift/SentryServiceClientFactory.java      |   5 +-
 .../sentry/service/thrift/ServiceConstants.java |  15 ++
 .../thrift/TestPoolClientInvocationHandler.java |  20 ++-
 .../tests/e2e/hive/TestPolicyImportExport.java  |   4 +-
 9 files changed, 319 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sentry/blob/d8425a8d/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java
index 4f42a51..2dc8af8 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java
@@ -21,17 +21,19 @@ package org.apache.sentry.provider.db.service.thrift;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 import javax.security.auth.callback.CallbackHandler;
 
+import com.google.common.net.HostAndPort;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SaslRpcServer;
-import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.sentry.core.common.exception.SentryUserException;
@@ -73,15 +75,17 @@ import com.google.common.collect.Sets;
 public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyServiceClient {
 
   private final Configuration conf;
-  private final InetSocketAddress serverAddress;
   private final boolean kerberos;
-  private final String[] serverPrincipalParts;
+  private String[] serverPrincipalParts;
   private SentryPolicyService.Client client;
   private TTransport transport;
   private int connectionTimeout;
   private static final Logger LOGGER = LoggerFactory
                                        .getLogger(SentryPolicyServiceClient.class);
   private static final String THRIFT_EXCEPTION_MESSAGE = "Thrift exception occurred ";
+  // configs for connection retry
+  private int connectionFullRetryTotal;
+  private List<InetSocketAddress> endpoints;
 
   /**
    * This transport wraps the Sasl transports to set up the right UGI context for open().
@@ -131,40 +135,121 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService
     }
   }
 
-  public SentryPolicyServiceClientDefaultImpl(Configuration conf) throws IOException {
-    this(Preconditions.checkNotNull(conf.get(ClientConfig.SERVER_RPC_ADDRESS), "Config key
"
-        + ClientConfig.SERVER_RPC_ADDRESS + " is required"), conf.getInt(
-        ClientConfig.SERVER_RPC_PORT, ClientConfig.SERVER_RPC_PORT_DEFAULT), conf);
+  /**
+   * Initialize the sentry configurations.
+   */
+  public SentryPolicyServiceClientDefaultImpl(Configuration conf)
+      throws IOException {
+    this.conf = conf;
+    Preconditions.checkNotNull(this.conf, "Configuration object cannot be null");
+    this.connectionTimeout = conf.getInt(ServiceConstants.ClientConfig.SERVER_RPC_CONN_TIMEOUT,
+        ServiceConstants.ClientConfig.SERVER_RPC_CONN_TIMEOUT_DEFAULT);
+    this.connectionFullRetryTotal = conf.getInt(ServiceConstants.ClientConfig.SENTRY_FULL_RETRY_TOTAL,
+        ServiceConstants.ClientConfig.SENTRY_FULL_RETRY_TOTAL_DEFAULT);
+    this.connectionTimeout = conf.getInt(ClientConfig.SERVER_RPC_CONN_TIMEOUT,
+        ClientConfig.SERVER_RPC_CONN_TIMEOUT_DEFAULT);
+    this.kerberos = ServerConfig.SECURITY_MODE_KERBEROS.equalsIgnoreCase(
+        conf.get(ServerConfig.SECURITY_MODE, ServerConfig.SECURITY_MODE_KERBEROS).trim());
+
+    String hostsAndPortsStr = conf.get(ServiceConstants.ClientConfig.SERVER_RPC_ADDRESS);
+    if (hostsAndPortsStr == null) {
+      throw new RuntimeException("Config key " +
+          ServiceConstants.ClientConfig.SERVER_RPC_ADDRESS + " is required");
+    }
+    int defaultPort = conf.getInt(ServiceConstants.ClientConfig.SERVER_RPC_PORT,
+        ServiceConstants.ClientConfig.SERVER_RPC_PORT_DEFAULT);
+    String[] hostsAndPortsStrArr = hostsAndPortsStr.split(",");
+    HostAndPort[] hostsAndPorts = ThriftUtil.parseHostPortStrings(hostsAndPortsStrArr, defaultPort);
+    this.endpoints = new ArrayList(hostsAndPortsStrArr.length);
+    for (int i = hostsAndPortsStrArr.length - 1; i >= 0 ; i--) {
+      this.endpoints.add(
+          new InetSocketAddress(hostsAndPorts[i].getHostText(),hostsAndPorts[i].getPort()));
+      LOGGER.debug("Added server endpoint: " + hostsAndPorts[i].toString());
+    }
   }
 
   public SentryPolicyServiceClientDefaultImpl(String addr, int port,
         Configuration conf) throws IOException {
     this.conf = conf;
     Preconditions.checkNotNull(this.conf, "Configuration object cannot be null");
-    this.serverAddress = NetUtils.createSocketAddr(Preconditions.checkNotNull(
+    InetSocketAddress serverAddress = NetUtils.createSocketAddr(Preconditions.checkNotNull(
                             addr, "Config key " + ClientConfig.SERVER_RPC_ADDRESS
                             + " is required"), port);
     this.connectionTimeout = conf.getInt(ClientConfig.SERVER_RPC_CONN_TIMEOUT,
                                          ClientConfig.SERVER_RPC_CONN_TIMEOUT_DEFAULT);
-    kerberos = ServerConfig.SECURITY_MODE_KERBEROS.equalsIgnoreCase(
+    this.kerberos = ServerConfig.SECURITY_MODE_KERBEROS.equalsIgnoreCase(
         conf.get(ServerConfig.SECURITY_MODE, ServerConfig.SECURITY_MODE_KERBEROS).trim());
+    connect(serverAddress);
+  }
+
+  /**
+   * This is a no-op when already connected.
+   * When there is a connection error, it will retry with another sentry server. It will
+   * first cycle through all the available sentry servers, and then retry the whole server
+   * list no more than connectionFullRetryTotal times. In this case, it won't introduce
+   * more latency when some server fails. Also to prevent all clients connecting to the
+   * same server, it will reorder the endpoints randomly after a full retry.
+   * <p>
+   * TODO: Have a small random sleep after a full retry to prevent all clients connecting
to the same server.
+   * <p>
+   * TODO: Add metrics for the number of successful connects and errors per client, and total
number of retries.
+   */
+  public synchronized void connectWithRetry() throws IOException {
+    if (isConnected()) {
+      return;
+    }
+    IOException currentException = null;
+    // Here for each full connectWithRetry it will cycle through all available sentry
+    // servers. Before each full connectWithRetry, it will shuffle the server list.
+    for (int retryCount = 0; retryCount < connectionFullRetryTotal; retryCount++) {
+      // Reorder endpoints randomly to prevent all clients connecting to the same endpoint
+      // at the same time after a node failure.
+      Collections.shuffle(endpoints);
+      for (InetSocketAddress addr : endpoints) {
+        try {
+          connect(addr);
+          LOGGER.info(String.format("Connected to SentryServer: %s", addr.toString()));
+          return;
+        } catch (IOException e) {
+          LOGGER.debug(String.format("Failed connection to %s: %s",
+              addr.toString(), e.getMessage()), e);
+          currentException = e;
+        }
+      }
+    }
+
+    // Throw exception as reaching the max full connectWithRetry number.
+    LOGGER.error(
+        String.format("Reach the max connection retry num %d ", connectionFullRetryTotal),
+        currentException);
+    throw currentException;
+  }
+
+  /**
+   * Connect to the specified socket address and throw IOException if failed.
+   */
+  private void connect(InetSocketAddress serverAddress) throws IOException {
     transport = new TSocket(serverAddress.getHostName(),
         serverAddress.getPort(), connectionTimeout);
     if (kerberos) {
-      String serverPrincipal = Preconditions.checkNotNull(conf.get(ServerConfig.PRINCIPAL),
ServerConfig.PRINCIPAL + " is required");
+      String serverPrincipal = Preconditions.checkNotNull(
+          conf.get(ServiceConstants.ServerConfig.PRINCIPAL),
+          ServiceConstants.ServerConfig.PRINCIPAL + " is required");
 
       // Resolve server host in the same way as we are doing on server side
-      serverPrincipal = SecurityUtil.getServerPrincipal(serverPrincipal, serverAddress.getAddress());
+      serverPrincipal =
+          SecurityUtil.getServerPrincipal(serverPrincipal, serverAddress.getAddress());
       LOGGER.debug("Using server kerberos principal: " + serverPrincipal);
 
       serverPrincipalParts = SaslRpcServer.splitKerberosName(serverPrincipal);
       Preconditions.checkArgument(serverPrincipalParts.length == 3,
-           "Kerberos principal should have 3 parts: " + serverPrincipal);
+          "Kerberos principal should have 3 parts: " + serverPrincipal);
       boolean wrapUgi = "true".equalsIgnoreCase(conf
-          .get(ServerConfig.SECURITY_USE_UGI_TRANSPORT, "true"));
-      transport = new UgiSaslClientTransport(AuthMethod.KERBEROS.getMechanismName(),
+          .get(ServiceConstants.ServerConfig.SECURITY_USE_UGI_TRANSPORT, "true"));
+      transport = new SentryPolicyServiceClientDefaultImpl.UgiSaslClientTransport(
+          SaslRpcServer.AuthMethod.KERBEROS.getMechanismName(),
           null, serverPrincipalParts[0], serverPrincipalParts[1],
-          ClientConfig.SASL_PROPERTIES, null, transport, wrapUgi);
+          ServiceConstants.ClientConfig.SASL_PROPERTIES, null, transport, wrapUgi);
     } else {
       serverPrincipalParts = null;
     }
@@ -174,7 +259,8 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService
       throw new IOException("Transport exception while opening transport: " + e.getMessage(),
e);
     }
     LOGGER.debug("Successfully opened transport: " + transport + " to " + serverAddress);
-    long maxMessageSize = conf.getLong(ServiceConstants.ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE,
+    long maxMessageSize = conf.getLong(
+        ServiceConstants.ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE,
         ServiceConstants.ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE_DEFAULT);
     TMultiplexedProtocol protocol = new TMultiplexedProtocol(
         new TBinaryProtocol(transport, maxMessageSize, maxMessageSize, true, true),
@@ -941,11 +1027,15 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService
   }
 
   public synchronized void close() {
-    if (transport != null) {
+    if (isConnected()) {
       transport.close();
     }
   }
 
+  private boolean isConnected() {
+    return transport != null && transport.isOpen();
+  }
+
   /**
    * Import the sentry mapping data, convert the mapping data from map structure to
    * TSentryMappingData, and call the import API.

http://git-wip-us.apache.org/repos/asf/sentry/blob/d8425a8d/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/ThriftUtil.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/ThriftUtil.java
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/ThriftUtil.java
index 3a96d0b..5fed04a 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/ThriftUtil.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/ThriftUtil.java
@@ -18,6 +18,7 @@
 
 package org.apache.sentry.provider.db.service.thrift;
 
+import com.google.common.net.HostAndPort;
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.transport.TSaslClientTransport;
 import org.apache.thrift.transport.TSaslServerTransport;
@@ -109,4 +110,18 @@ public final class ThriftUtil {
   private ThriftUtil() {
     // Make constructor private to avoid instantiation
   }
+
+  /**
+   * Utility function for parsing host and port strings. Expected form should be
+   * (host:port). The hostname could be in ipv6 style. If port is not specified,
+   * defaultPort will be used.
+   */
+  public static HostAndPort[] parseHostPortStrings(String[] hostsAndPortsArr, int defaultPort)
{
+    HostAndPort[] hostsAndPorts = new HostAndPort[hostsAndPortsArr.length];
+    for (int i = 0; i < hostsAndPorts.length; i++) {
+     hostsAndPorts[i] =
+          HostAndPort.fromString(hostsAndPortsArr[i]).withDefaultPort(defaultPort);
+    }
+    return hostsAndPorts;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sentry/blob/d8425a8d/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/PoolClientInvocationHandler.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/PoolClientInvocationHandler.java
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/PoolClientInvocationHandler.java
index 730bfec..d5f4fcb 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/PoolClientInvocationHandler.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/PoolClientInvocationHandler.java
@@ -17,10 +17,11 @@
  */
 package org.apache.sentry.service.thrift;
 
+import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 
-import com.google.common.annotations.VisibleForTesting;
+import com.google.common.net.HostAndPort;
 import org.apache.commons.pool2.PooledObjectFactory;
 import org.apache.commons.pool2.impl.AbandonedConfig;
 import org.apache.commons.pool2.impl.GenericObjectPool;
@@ -28,6 +29,7 @@ import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.sentry.core.common.exception.SentryUserException;
 import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClient;
+import org.apache.sentry.provider.db.service.thrift.ThriftUtil;
 import org.apache.sentry.service.thrift.ServiceConstants.ClientConfig;
 import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
@@ -138,74 +140,13 @@ public class PoolClientInvocationHandler extends SentryClientInvocationHandler
{
     }
     int defaultPort = conf.getInt(ClientConfig.SERVER_RPC_PORT,
         ClientConfig.SERVER_RPC_PORT_DEFAULT);
-    String[] hostsAndPorts = hostsAndPortsStr.split(",");
-    String[] hosts = new String[hostsAndPorts.length];
-    int[] ports = new int[hostsAndPorts.length];
-    parseHostPortStrings(hostsAndPortsStr, hostsAndPorts, hosts,
-        ports, defaultPort);
+    String[] hostsAndPortsStrArr = hostsAndPortsStr.split(",");
+    HostAndPort[] hostsAndPorts = ThriftUtil.parseHostPortStrings(hostsAndPortsStrArr, defaultPort);
     this.endpoints = new Endpoint[hostsAndPorts.length];
     for (int i = 0; i < this.endpoints.length; i++) {
-      this.endpoints[i] = new Endpoint(hosts[i], ports[i]);
-      LOGGER.info("Initiate sentry sever endpoint: hostname " + hosts[i] + ", port " + ports[i]);
-    }
-  }
-
-  @VisibleForTesting
-  /**
-   * Utility function for parsing host and port strings. Expected form should be
-   * (host:port). The hostname could be in ipv6 style. Port number can be empty
-   * and will be default to defaultPort.
-   */
-  static protected void parseHostPortStrings(String hostsAndPortsStr,
-        String[] hostsAndPorts, String[] hosts, int[] ports,
-        int defaultPort) {
-    int i = -1;
-    for (String hostAndPort: hostsAndPorts) {
-      i++;
-      hostAndPort = hostAndPort.trim();
-      if (hostAndPort.isEmpty()) {
-        throw new RuntimeException("Cannot handle empty server RPC address " +
-            "in component " + (i + 1) + " of " + hostsAndPortsStr);
-      }
-      int colonIdx = hostAndPort.lastIndexOf(":");
-      if (colonIdx == -1) {
-        // There is no colon in the host+port string.
-        // That means the port is left unspecified, and should be set to
-        // the default.
-        hosts[i] = hostAndPort;
-        ports[i] = defaultPort;
-        continue;
-      }
-      int rightBracketIdx = hostAndPort.indexOf(']', colonIdx);
-      if (rightBracketIdx != -1) {
-        // If there is a right bracket that occurs after the colon, the
-        // colon we found is part of an ipv6 address like this:
-        // [::1].  That means we only have the host part, not the port part.
-        hosts[i] = hostAndPort.substring(0, rightBracketIdx);
-        ports[i] = defaultPort;
-        continue;
-      }
-      // We have a host:port string, where the part after colon should be
-      // the port.
-      hosts[i] = hostAndPort.substring(0, colonIdx);
-      String portStr = hostAndPort.substring(colonIdx+1);
-      try {
-        ports[i] = Integer.valueOf(portStr);
-      } catch (NumberFormatException e) {
-        throw new RuntimeException("Cannot parse port string " + portStr +
-            "in component " + (i + 1) + " of " + hostsAndPortsStr);
-      }
-      if ((ports[i] < 0) || (ports[i] > 65535)) {
-        throw new RuntimeException("Invalid port number given for " + portStr +
-            "in component " + (i + 1) + " of " + hostsAndPortsStr);
-      }
-    }
-    // Strip the brackets off of hostnames and ip addresses enclosed in square
-    // brackets.  This is to support ipv6-style [address]:port addressing.
-    for (int j = 0; j < hosts.length; j++) {
-      if ((hosts[j].startsWith("[")) && (hosts[j].endsWith("]"))) {
-        hosts[j] = hosts[j].substring(1, hosts[j].length() - 1);
-      }
+      this.endpoints[i] = new Endpoint(hostsAndPorts[i].getHostText(),hostsAndPorts[i].getPort());
+      LOGGER.info("Initiate sentry sever endpoint: hostname " +
+          hostsAndPorts[i].getHostText() + ", port " + hostsAndPorts[i].getPort());
     }
   }
 
@@ -298,6 +239,11 @@ public class PoolClientInvocationHandler extends SentryClientInvocationHandler
{
       client = pool.borrowObject();
     } catch (Exception e) {
       LOGGER.debug(POOL_EXCEPTION_MESSAGE, e);
+      // If the exception is caused by connection problem, throw the TTransportException
+      // for reconnect.
+      if (e instanceof IOException) {
+        throw new TTransportException(e);
+      }
       throw new SentryUserException(e.getMessage(), e);
     }
     try {
@@ -305,13 +251,14 @@ public class PoolClientInvocationHandler extends SentryClientInvocationHandler
{
       result = method.invoke(client, args);
     } catch (InvocationTargetException e) {
       // Get the target exception, check if SentryUserException or TTransportException is
wrapped.
-      // TTransportException means there has connection problem with the pool.
+      // TTransportException or IOException means there has connection problem with the pool.
       Throwable targetException = e.getCause();
       if (targetException instanceof SentryUserException) {
         Throwable sentryTargetException = targetException.getCause();
         // If there has connection problem, eg, invalid connection if the service restarted,
-        // sentryTargetException instanceof TTransportException = true.
-        if (sentryTargetException instanceof TTransportException) {
+        // sentryTargetException instanceof TTransportException or IOException = true.
+        if (sentryTargetException instanceof TTransportException
+            || sentryTargetException instanceof IOException) {
           // If the exception is caused by connection problem, destroy the instance and
           // remove it from the commons-pool. Throw the TTransportException for reconnect.
           pool.invalidateObject(client);

http://git-wip-us.apache.org/repos/asf/sentry/blob/d8425a8d/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/RetryClientInvocationHandler.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/RetryClientInvocationHandler.java
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/RetryClientInvocationHandler.java
new file mode 100644
index 0000000..c4964c3
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/RetryClientInvocationHandler.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.service.thrift;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sentry.core.common.exception.SentryUserException;
+import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClientDefaultImpl;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+/**
+ * The RetryClientInvocationHandler is a proxy class for handling thrift calls for non-pool
+ * model. Currently only one client connection is allowed, and it's using lazy connection.
+ * The client is not connected to the sentry server until there is any rpc call.
+ * <p>
+ * For every rpc call, if the client is not connected, it will first connect to a sentry
+ * server, and then do the thrift call to the connected sentry server, which will execute
+ * the requested method and return back the response. If it is failed with connection
+ * problem, it will close the current connection and retry (reconnect and resend the
+ * thrift call) no more than rpcRetryTotal times. If the client is already connected, it
+ * will reuse the existing connection, and do the thrift call.
+ * <p>
+ * During reconnection, it will first cycle through all the available sentry servers, and
+ * then retry the whole server list no more than connectionFullRetryTotal times. In this
+ * case, it won't introduce more latency when some server fails. Also to prevent all
+ * clients connecting to the same server, it will reorder the endpoints randomly after a
+ * full retry.
+ * <p>
+ * TODO: allow multiple client connections
+ */
+class RetryClientInvocationHandler extends SentryClientInvocationHandler{
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(RetryClientInvocationHandler.class);
+  private final Configuration conf;
+  private SentryPolicyServiceClientDefaultImpl client = null;
+  private final int rpcRetryTotal;
+
+  /**
+   * Initialize the sentry configurations, including rpc retry count and client connection
+   * configs for SentryPolicyServiceClientDefaultImpl
+   */
+  RetryClientInvocationHandler(Configuration conf) throws IOException {
+    this.conf = conf;
+    Preconditions.checkNotNull(this.conf, "Configuration object cannot be null");
+    this.rpcRetryTotal = conf.getInt(ServiceConstants.ClientConfig.SENTRY_RPC_RETRY_TOTAL,
+        ServiceConstants.ClientConfig.SENTRY_RPC_RETRY_TOTAL_DEFAULT);
+    client = new SentryPolicyServiceClientDefaultImpl(conf);
+  }
+
+  /**
+   * For every rpc call, if the client is not connected, it will first connect to a sentry
+   * server, and then do the thrift call to the connected sentry server, which will
+   * execute the requested method and return back the response. If it is failed with
+   * connection problem, it will close the current connection, and retry (reconnect and
+   * resend the thrift call) no more than rpcRetryTotal times. Throw SentryUserException
+   * if failed retry after rpcRetryTotal times.
+   * Synchronized it for thread safety.
+   */
+  @Override
+  synchronized Object invokeImpl(Object proxy, Method method, Object[] args) throws Exception
{
+    int retryCount = 0;
+    Exception lastExc = null;
+
+    while (retryCount < rpcRetryTotal) {
+      // Connect to a sentry server if not connected yet.
+      try {
+        client.connectWithRetry();
+      } catch (IOException e) {
+        // Increase the retry num
+        // Retry when the exception is caused by connection problem.
+        retryCount++;
+        lastExc = e;
+        close();
+        continue;
+      }
+
+      // do the thrift call
+      try {
+        return method.invoke(client, args);
+      } catch (InvocationTargetException e) {
+        // Get the target exception, check if SentryUserException or TTransportException
is wrapped.
+        // TTransportException means there has connection problem with the pool.
+        Throwable targetException = e.getCause();
+        if (targetException instanceof SentryUserException) {
+          Throwable sentryTargetException = targetException.getCause();
+          // If there has connection problem, eg, invalid connection if the service restarted,
+          // sentryTargetException instanceof TTransportException = true.
+          if (sentryTargetException instanceof TTransportException) {
+            // Retry when the exception is caused by connection problem.
+            lastExc = new TTransportException(sentryTargetException);
+            LOGGER.debug("Got TTransportException when do the thrift call ", lastExc);
+          } else {
+            // The exception is thrown by thrift call, eg, SentryAccessDeniedException.
+            // Do not need to reconnect to the sentry server.
+            throw (SentryUserException) targetException;
+          }
+        } else {
+          throw e;
+        }
+      }
+
+      // Increase the retry num
+      retryCount++;
+
+      // For connection problem, it will close the current connection, and reconnect to
+      // an available sentry server and redo the thrift call.
+      close();
+    }
+    // Throw the exception as reaching the max rpc retry num.
+    LOGGER.error(String.format("failed after %d retries ", rpcRetryTotal), lastExc);
+    throw new SentryUserException(
+        String.format("failed after %d retries ", rpcRetryTotal), lastExc);
+  }
+
+  @Override
+  public void close() {
+    client.close();
+    LOGGER.debug("Close the current client connection");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/d8425a8d/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryClientInvocationHandler.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryClientInvocationHandler.java
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryClientInvocationHandler.java
index a41be7f..b8c7f23 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryClientInvocationHandler.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryClientInvocationHandler.java
@@ -44,7 +44,7 @@ public abstract class SentryClientInvocationHandler implements InvocationHandler
   /**
    * Subclass should implement this method for special function
    */
-  public abstract Object invokeImpl(Object proxy, Method method, Object[] args) throws Exception;
+  abstract Object invokeImpl(Object proxy, Method method, Object[] args) throws Exception;
 
   /**
    * An abstract method "close", an invocationHandler should close its contexts at here.

http://git-wip-us.apache.org/repos/asf/sentry/blob/d8425a8d/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java
index b7d2be1..f822497 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java
@@ -40,7 +40,10 @@ public final class SentryServiceClientFactory {
               SentryPolicyServiceClientDefaultImpl.class.getInterfaces(),
               new PoolClientInvocationHandler(conf));
     } else {
-      return new SentryPolicyServiceClientDefaultImpl(conf);
+      return (SentryPolicyServiceClient) Proxy
+          .newProxyInstance(SentryPolicyServiceClientDefaultImpl.class.getClassLoader(),
+              SentryPolicyServiceClientDefaultImpl.class.getInterfaces(),
+              new RetryClientInvocationHandler(conf));
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/sentry/blob/d8425a8d/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java
index a249904..06a9571 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java
@@ -26,6 +26,7 @@ import org.apache.sentry.provider.db.service.thrift.SentryMetrics;
 
 import com.google.common.base.Splitter;
 import com.google.common.collect.ImmutableMap;
+import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClientDefaultImpl;
 
 public class ServiceConstants {
 
@@ -237,6 +238,20 @@ public class ServiceConstants {
     public static final String SENTRY_POOL_RETRY_TOTAL = "sentry.service.client.connection.pool.retry-total";
     public static final int SENTRY_POOL_RETRY_TOTAL_DEFAULT = 3;
 
+    /**
+     * full retry num for getting the connection in non-pool model
+     * In a full retry, it will cycle through all available sentry servers
+     * {@link SentryPolicyServiceClientDefaultImpl#connectWithRetry()}
+     */
+    public static final String SENTRY_FULL_RETRY_TOTAL = "sentry.service.client.connection.full.retry-total";
+    public static final int SENTRY_FULL_RETRY_TOTAL_DEFAULT = 2;
+    /**
+     * max retry num for client rpc
+     * {@link RetryClientInvocationHandler#invokeImpl(Object, Method, Object[])}
+     */
+    public static final String SENTRY_RPC_RETRY_TOTAL = "sentry.service.client.rpc.retry-total";
+    public static final int SENTRY_RPC_RETRY_TOTAL_DEFAULT = 3;
+
     // max message size for thrift messages
     public static final String SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE = "sentry.policy.client.thrift.max.message.size";
     public static final long SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE_DEFAULT = 100 *
1024 * 1024;

http://git-wip-us.apache.org/repos/asf/sentry/blob/d8425a8d/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestPoolClientInvocationHandler.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestPoolClientInvocationHandler.java
b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestPoolClientInvocationHandler.java
index 5b0e12b..7292387 100644
--- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestPoolClientInvocationHandler.java
+++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestPoolClientInvocationHandler.java
@@ -18,6 +18,8 @@
 
 package org.apache.sentry.service.thrift;
 
+import com.google.common.net.HostAndPort;
+import org.apache.sentry.provider.db.service.thrift.ThriftUtil;
 import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -30,12 +32,10 @@ public class TestPoolClientInvocationHandler {
   private void expectParseHostPortStrings(String hostsAndPortsStr,
         String[] expectedHosts, int[] expectedPorts) throws Exception {
     boolean success = false;
-    String[] hostsAndPorts = hostsAndPortsStr.split(",");
-    String[] hosts = new String[hostsAndPorts.length];
-    int[] ports = new int[hostsAndPorts.length];
+    String[] hostsAndPortsStrArr = hostsAndPortsStr.split(",");
+    HostAndPort[] hostsAndPorts;
     try {
-      PoolClientInvocationHandler.parseHostPortStrings(hostsAndPortsStr,
-          hostsAndPorts, hosts, ports, 8038);
+      hostsAndPorts = ThriftUtil.parseHostPortStrings(hostsAndPortsStrArr, 8038);
       success = true;
     } finally {
       if (!success) {
@@ -43,12 +43,22 @@ public class TestPoolClientInvocationHandler {
             hostsAndPortsStr);
       }
     }
+    String[] hosts = new String[hostsAndPortsStrArr.length];
+    int[] ports = new int[hostsAndPortsStrArr.length];
+    parseHostsAndPorts(hostsAndPorts, hosts, ports);
     Assert.assertArrayEquals("Got unexpected hosts results while " +
         "parsing " + hostsAndPortsStr, expectedHosts, hosts);
     Assert.assertArrayEquals("Got unexpected ports results while " +
         "parsing " + hostsAndPortsStr, expectedPorts, ports);
   }
 
+  private void parseHostsAndPorts(HostAndPort[] hostsAndPorts, String[] hosts, int[] ports)
{
+    for (int i = 0; i < hostsAndPorts.length; i++) {
+      hosts[i] = hostsAndPorts[i].getHostText();
+      ports[i] = hostsAndPorts[i].getPort();
+    }
+  }
+
   @SuppressWarnings("PMD.AvoidUsingHardCodedIP")
   @Test
   public void testParseHostPortStrings() throws Exception {

http://git-wip-us.apache.org/repos/asf/sentry/blob/d8425a8d/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hive/TestPolicyImportExport.java
----------------------------------------------------------------------
diff --git a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hive/TestPolicyImportExport.java
b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hive/TestPolicyImportExport.java
index 3f57a00..b048989 100644
--- a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hive/TestPolicyImportExport.java
+++ b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hive/TestPolicyImportExport.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.fail;
 
 import java.io.File;
 import java.io.FileOutputStream;
+import java.lang.reflect.UndeclaredThrowableException;
 import java.util.Map;
 import java.util.Set;
 
@@ -172,8 +173,9 @@ public class TestPolicyImportExport extends AbstractTestWithStaticConfiguration
     try {
       configTool.importPolicy();
       fail("IllegalArgumentException should be thrown for: Invalid key value: server [server]");
-    } catch (IllegalArgumentException ex) {
+    } catch (UndeclaredThrowableException ex) {
       // ignore
+      assertTrue(ex.getUndeclaredThrowable().getCause() instanceof IllegalArgumentException);
     }
   }
 


Mime
View raw message