sentry-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ak...@apache.org
Subject [5/5] sentry git commit: Client Failover reorg prototype
Date Thu, 06 Apr 2017 21:36:04 GMT
Client Failover reorg prototype


Project: http://git-wip-us.apache.org/repos/asf/sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/c9c0119f
Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/c9c0119f
Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/c9c0119f

Branch: refs/heads/SENTRY-1593-akolb
Commit: c9c0119fc9e61615b9445d989dd63c9395a1b21c
Parents: e3d859a
Author: Alexander Kolbasov <akolb@cloudera.com>
Authored: Thu Apr 6 14:35:46 2017 -0700
Committer: Alexander Kolbasov <akolb@cloudera.com>
Committed: Thu Apr 6 14:35:46 2017 -0700

----------------------------------------------------------------------
 .../transport/RetryClientInvocationHandler.java |  22 +-
 .../SentryClientTransportConfigInterface.java   |   2 +-
 .../common/transport/SentryServiceClient.java   |  48 ---
 ...SentryServiceClientTransportDefaultImpl.java | 342 -------------------
 .../core/common/transport/SentrySocket.java     |  32 ++
 .../transport/SentryTransportFactory.java       | 234 +++++++++++++
 .../sentry/hdfs/SentryHDFSServiceClient.java    |   5 +-
 .../SentryHDFSServiceClientDefaultImpl.java     |  43 ++-
 .../hdfs/SentryHDFSServiceClientFactory.java    |  11 +-
 .../thrift/SentryGenericServiceClient.java      |   5 +-
 .../SentryGenericServiceClientDefaultImpl.java  |  50 ++-
 .../SentryGenericServiceClientFactory.java      |   8 +-
 .../thrift/SentryPolicyServiceClient.java       |   5 +-
 .../SentryPolicyServiceClientDefaultImpl.java   |  46 ++-
 .../thrift/SentryServiceClientFactory.java      |  25 +-
 .../thrift/SentryServiceClientPoolFactory.java  |  27 +-
 16 files changed, 428 insertions(+), 477 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/RetryClientInvocationHandler.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/RetryClientInvocationHandler.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/RetryClientInvocationHandler.java
index b01cb37..86569c9 100644
--- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/RetryClientInvocationHandler.java
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/RetryClientInvocationHandler.java
@@ -49,16 +49,20 @@ import java.lang.reflect.Method;
  * TODO(kalyan) allow multiple client connections using <code>PoolClientInvocationHandler</code>
  */
 
-public class RetryClientInvocationHandler extends SentryClientInvocationHandler {
+public final class RetryClientInvocationHandler extends SentryClientInvocationHandler {
   private static final Logger LOGGER =
     LoggerFactory.getLogger(RetryClientInvocationHandler.class);
-  private SentryServiceClient client = null;
+  private final int retries;
+  private final SentrySocket client;
 
   /**
    * Initialize the sentry configurations, including rpc retry count and client connection
    * configs for SentryPolicyServiceClientDefaultImpl
    */
-  public RetryClientInvocationHandler(Configuration conf, SentryServiceClient clientObject) {
+  public RetryClientInvocationHandler(Configuration conf,
+                                      SentryClientTransportConfigInterface transportConfig,
+                                      SentrySocket clientObject) {
+    retries = transportConfig.getSentryRpcRetryTotal(conf);
     Preconditions.checkNotNull(conf, "Configuration object cannot be null");
     client = clientObject;
   }
@@ -77,18 +81,17 @@ public class RetryClientInvocationHandler extends SentryClientInvocationHandler
   synchronized public Object invokeImpl(Object proxy, Method method, Object[] args) throws Exception {
     int retryCount = 0;
     Exception lastExc = null;
-    boolean tryAlternateServer = false;
 
-    while (retryCount < client.getRetryCount()) {
+    while (retryCount < retries) {
       // Connect to a sentry server if not connected yet.
       try {
-        client.connectWithRetry(tryAlternateServer);
+        client.connect();
       } catch (IOException e) {
         // Increase the retry num
         // Retry when the exception is caused by connection problem.
         retryCount++;
         lastExc = e;
-        close();
+        client.close();
         continue;
       }
 
@@ -108,7 +111,6 @@ public class RetryClientInvocationHandler extends SentryClientInvocationHandler
             // Retry when the exception is caused by connection problem.
             lastExc = new TTransportException(sentryTargetException);
             LOGGER.error("Got TTransportException when do the thrift call ", lastExc);
-            tryAlternateServer = true;
             // Closing the thrift client on TTransportException. New client object is
             // created using new socket when an attempt to reconnect is made.
             close();
@@ -131,9 +133,9 @@ public class RetryClientInvocationHandler extends SentryClientInvocationHandler
     }
 
     // Throw the exception as reaching the max rpc retry num.
-    LOGGER.error(String.format("failed after %d retries ", client.getRetryCount()), lastExc);
+    LOGGER.error(String.format("failed after %d retries ", retries), lastExc);
     throw new SentryUserException(
-      String.format("failed after %d retries ", client.getRetryCount()), lastExc);
+      String.format("failed after %d retries ", retries), lastExc);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConfigInterface.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConfigInterface.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConfigInterface.java
index 24192fd..3ea36a1 100644
--- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConfigInterface.java
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConfigInterface.java
@@ -28,7 +28,7 @@ import org.apache.sentry.core.common.exception.MissingConfigurationException;
  * This Configuration interface should be implemented for all the sentry clients to get
  * the transport configuration.
  */
-interface SentryClientTransportConfigInterface {
+public interface SentryClientTransportConfigInterface {
   /**
    * @param conf configuration
    * @return number of times client retry logic should iterate through all

http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClient.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClient.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClient.java
deleted file mode 100644
index dc93fb7..0000000
--- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClient.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sentry.core.common.transport;
-
-import java.io.Closeable;
-
-/**
- * Client interface for Proxy Invocation handlers
- * <p>
- * Defines interface that Sentry client's should expose to the Invocation handlers like
- * <code>RetryClientInvocationHandler</code> used to proxy the method invocation on sentry
- * client instances .
- * <p>
- * All the sentry clients that need retrying and failover capabilities should implement
- * this interface.
- */
-public interface SentryServiceClient extends Closeable {
-  /**
-   * This is a no-op when already connected.
-   * When there is a connection error, it will retry with another sentry server. It will
-   * first cycle through all the available sentry servers, and then retry the whole server
-   * list no more than connectionFullRetryTotal times. In this case, it won't introduce
-   * more latency when some server fails. Also to prevent all clients connecting to the
-   * same server, it will reorder the endpoints randomly after a full retry.
-   * <p>
-   * TODO: Add metrics for the number of successful connects and errors per client, and total number of retries.
-   */
-  void connectWithRetry(boolean tryAlternateServer) throws Exception;
-
-  int getRetryCount();
-
-  void close();
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClientTransportDefaultImpl.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClientTransportDefaultImpl.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClientTransportDefaultImpl.java
deleted file mode 100644
index 4c126fb..0000000
--- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClientTransportDefaultImpl.java
+++ /dev/null
@@ -1,342 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sentry.core.common.transport;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.net.HostAndPort;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.SaslRpcServer;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.sentry.core.common.exception.MissingConfigurationException;
-import org.apache.thrift.transport.TSaslClientTransport;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
-import org.apache.sentry.core.common.utils.ThriftUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.security.sasl.Sasl;
-import javax.security.sasl.SaslException;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.Collections;
-
-/**
- * Implements the transport functionality for sentry clients.
- * All the sentry clients should extend this class for transport implementation.
- */
-
-public abstract class SentryServiceClientTransportDefaultImpl {
-  protected final Configuration conf;
-  protected final boolean kerberos;
-  private String[] serverPrincipalParts;
-
-  protected TTransport transport;
-  private final int connectionTimeout;
-  private static final Logger LOGGER = LoggerFactory.getLogger(SentryServiceClientTransportDefaultImpl.class);
-  // configs for connection retry
-  private final int connectionFullRetryTotal;
-  private final int rpcRetryTotal;
-  private final ArrayList<InetSocketAddress> endpoints;
-  protected InetSocketAddress serverAddress;
-  private final SentryClientTransportConfigInterface transportConfig;
-  private static final ImmutableMap<String, String> SASL_PROPERTIES =
-    ImmutableMap.of(Sasl.SERVER_AUTH, "true", Sasl.QOP, "auth-conf");
-
-  /**
-   * Defines various client types.
-   */
-  protected enum sentryClientType {
-    POLICY_CLIENT,
-    HDFS_CLIENT,
-  }
-
-  /**
-   * This transport wraps the Sasl transports to set up the right UGI context for open().
-   */
-  public static class UgiSaslClientTransport extends TSaslClientTransport {
-    UserGroupInformation ugi = null;
-
-    public UgiSaslClientTransport(String mechanism, String protocol,
-                                  String serverName, TTransport transport,
-                                  boolean wrapUgi, Configuration conf)
-      throws IOException, SaslException {
-      super(mechanism, null, protocol, serverName, SASL_PROPERTIES, null,
-        transport);
-      if (wrapUgi) {
-        // If we don't set the configuration, the UGI will be created based on
-        // what's on the classpath, which may lack the kerberos changes we require
-        UserGroupInformation.setConfiguration(conf);
-        ugi = UserGroupInformation.getLoginUser();
-      }
-    }
-
-    // open the SASL transport with using the current UserGroupInformation
-    // This is needed to get the current login context stored
-    @Override
-    public void open() throws TTransportException {
-      if (ugi == null) {
-        baseOpen();
-      } else {
-        try {
-          if (ugi.isFromKeytab()) {
-            ugi.checkTGTAndReloginFromKeytab();
-          }
-          ugi.doAs(new PrivilegedExceptionAction<Void>() {
-            public Void run() throws TTransportException {
-              baseOpen();
-              return null;
-            }
-          });
-        } catch (IOException e) {
-          throw new TTransportException("Failed to open SASL transport: " + e.getMessage(), e);
-        } catch (InterruptedException e) {
-          throw new TTransportException(
-            "Interrupted while opening underlying transport: " + e.getMessage(), e);
-        }
-      }
-    }
-
-    private void baseOpen() throws TTransportException {
-      super.open();
-    }
-  }
-
-  /**
-   * Initialize the object based on the sentry configuration provided.
-   * List of configured servers are reordered randomly preventing all
-   * clients connecting to the same server.
-   *
-   * @param conf Sentry configuration
-   * @param type Type indicates the service type
-   */
-  public SentryServiceClientTransportDefaultImpl(Configuration conf,
-                                                 sentryClientType type) throws IOException {
-
-    this.conf = conf;
-    Preconditions.checkNotNull(this.conf, "Configuration object cannot be null");
-    serverPrincipalParts = null;
-    if (type == sentryClientType.POLICY_CLIENT) {
-      transportConfig = new SentryPolicyClientTransportConfig();
-    } else {
-      transportConfig = new SentryHDFSClientTransportConfig();
-    }
-
-    try {
-      String hostsAndPortsStr;
-      this.connectionTimeout = transportConfig.getServerRpcConnTimeoutInMs(conf);
-      this.connectionFullRetryTotal = transportConfig.getSentryFullRetryTotal(conf);
-      this.rpcRetryTotal = transportConfig.getSentryRpcRetryTotal(conf);
-      this.kerberos = transportConfig.isKerberosEnabled(conf);
-
-      hostsAndPortsStr = transportConfig.getSentryServerRpcAddress(conf);
-
-      int serverPort = transportConfig.getServerRpcPort(conf);
-
-      String[] hostsAndPortsStrArr = hostsAndPortsStr.split(",");
-      HostAndPort[] hostsAndPorts = ThriftUtil.parseHostPortStrings(hostsAndPortsStrArr, serverPort);
-
-      this.endpoints = new ArrayList(hostsAndPortsStrArr.length);
-      for (HostAndPort endpoint : hostsAndPorts) {
-        this.endpoints.add(
-          new InetSocketAddress(endpoint.getHostText(), endpoint.getPort()));
-        LOGGER.debug("Added server endpoint: " + endpoint.toString());
-      }
-
-      // Reorder endpoints randomly to prevent all clients connecting to the same endpoint
-      // at the same time after a node failure.
-      Collections.shuffle(endpoints);
-      serverAddress = null;
-      connectWithRetry(false);
-    } catch (Exception e) {
-      throw new RuntimeException("Client Creation Failed: " + e.getMessage(), e);
-    }
-  }
-
-  /**
-   * Initialize object based on the parameters provided provided.
-   *
-   * @param addr Host address which the client needs to connect
-   * @param port Host Port which the client needs to connect
-   * @param conf Sentry configuration
-   * @param type Type indicates the service type
-   */
-  public SentryServiceClientTransportDefaultImpl(String addr, int port, Configuration conf,
-                                                 sentryClientType type) throws IOException {
-    // copy the configuration because we may make modifications to it.
-    this.conf = new Configuration(conf);
-    serverPrincipalParts = null;
-    Preconditions.checkNotNull(this.conf, "Configuration object cannot be null");
-    if (type == sentryClientType.POLICY_CLIENT) {
-      transportConfig = new SentryPolicyClientTransportConfig();
-    } else {
-      transportConfig = new SentryHDFSClientTransportConfig();
-    }
-
-    try {
-      InetSocketAddress serverAddress = NetUtils.createSocketAddr(addr, port);
-      this.connectionTimeout = transportConfig.getServerRpcConnTimeoutInMs(conf);
-      this.rpcRetryTotal = transportConfig.getSentryRpcRetryTotal(conf);
-      this.connectionFullRetryTotal = transportConfig.getSentryFullRetryTotal(conf);
-      this.kerberos = transportConfig.isKerberosEnabled(conf);
-      connect(serverAddress);
-    } catch (MissingConfigurationException e) {
-      throw new RuntimeException("Client Creation Failed: " + e.getMessage(), e);
-    }
-    endpoints = null;
-  }
-
-
-  /**
-   * no-op when already connected.
-   * On connection error, Iterates through all the configured servers and tries to connect.
-   * On successful connection, control returns
-   * On connection failure, continues iterating through all the configured sentry servers,
-   * and then retries the whole server list no more than connectionFullRetryTotal times.
-   * In this case, it won't introduce more latency when some server fails.
-   * <p>
-   * TODO: Add metrics for the number of successful connects and errors per client, and total number of retries.
-   */
-  public synchronized void connectWithRetry(boolean tryAlternateServer) throws IOException {
-    if (isConnected() && (!tryAlternateServer)) {
-      return;
-    }
-
-    IOException currentException = null;
-    for (int retryCount = 0; retryCount < connectionFullRetryTotal; retryCount++) {
-      try {
-        connectToAvailableServer();
-        return;
-      } catch (IOException e) {
-        currentException = e;
-        LOGGER.error(
-          String.format("Failed to connect to all the configured sentry servers, " +
-            "Retrying again"));
-      }
-    }
-    // Throw exception as reaching the max full connectWithRetry number.
-    LOGGER.error(
-      String.format("Reach the max connection retry num %d ", connectionFullRetryTotal),
-      currentException);
-    throw currentException;
-  }
-
-  /**
-   * Iterates through all the configured servers and tries to connect.
-   * On connection error, tries to connect to next server.
-   * Control returns on successful connection OR it's done trying to all the
-   * configured servers.
-   *
-   * @throws IOException
-   */
-  private void connectToAvailableServer() throws IOException {
-    IOException currentException = null;
-    if (endpoints.size() == 1) {
-      connect(endpoints.get(0));
-      return;
-    }
-
-    for (InetSocketAddress addr : endpoints) {
-      try {
-        serverAddress = addr;
-        connect(serverAddress);
-        LOGGER.info(String.format("Connected to SentryServer: %s", addr.toString()));
-        return;
-      } catch (IOException e) {
-        LOGGER.error(String.format("Failed connection to %s: %s",
-          addr.toString(), e.getMessage()), e);
-        currentException = e;
-      }
-    }
-    throw currentException;
-  }
-
-  /**
-   * Connect to the specified socket address and throw IOException if failed.
-   *
-   * @param serverAddress Address client needs to connect
-   * @throws Exception if there is failure in establishing the connection.
-   */
-  protected void connect(InetSocketAddress serverAddress) throws IOException {
-    try {
-      transport = createTransport(serverAddress);
-      transport.open();
-    } catch (TTransportException e) {
-      throw new IOException("Failed to open transport: " + e.getMessage(), e);
-    } catch (MissingConfigurationException e) {
-      throw new RuntimeException(e.getMessage(), e);
-    }
-
-    LOGGER.debug("Successfully opened transport: " + transport + " to " + serverAddress);
-  }
-
-  /**
-   * New socket is is created
-   *
-   * @param serverAddress
-   * @return
-   * @throws TTransportException
-   * @throws MissingConfigurationException
-   * @throws IOException
-   */
-  private TTransport createTransport(InetSocketAddress serverAddress)
-    throws TTransportException, MissingConfigurationException, IOException {
-    TTransport socket = new TSocket(serverAddress.getHostName(),
-      serverAddress.getPort(), connectionTimeout);
-
-    if (kerberos) {
-      String serverPrincipal = transportConfig.getSentryPrincipal(conf);
-      serverPrincipal = SecurityUtil.getServerPrincipal(serverPrincipal, serverAddress.getAddress());
-      LOGGER.debug("Using server kerberos principal: " + serverPrincipal);
-      if (serverPrincipalParts == null) {
-        serverPrincipalParts = SaslRpcServer.splitKerberosName(serverPrincipal);
-        Preconditions.checkArgument(serverPrincipalParts.length == 3,
-          "Kerberos principal should have 3 parts: " + serverPrincipal);
-      }
-
-      boolean wrapUgi = transportConfig.useUserGroupInformation(conf);
-      return new UgiSaslClientTransport(SaslRpcServer.AuthMethod.KERBEROS.getMechanismName(),
-        serverPrincipalParts[0], serverPrincipalParts[1],
-        socket, wrapUgi, conf);
-    } else {
-      return socket;
-    }
-  }
-
-  private boolean isConnected() {
-    return transport != null && transport.isOpen();
-  }
-
-  public synchronized void close() {
-    if (isConnected()) {
-      transport.close();
-    }
-  }
-
-  public int getRetryCount() {
-    return rpcRetryTotal;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentrySocket.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentrySocket.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentrySocket.java
new file mode 100644
index 0000000..3374489
--- /dev/null
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentrySocket.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.core.common.transport;
+
+import java.io.IOException;
+
+/**
+ * General representation of transport connection to Sentry
+ */
+public interface SentrySocket extends AutoCloseable {
+  /**
+   * Connect to the Sentry server
+   * @throws IOException
+   */
+  void connect() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportFactory.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportFactory.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportFactory.java
new file mode 100644
index 0000000..74ac92d
--- /dev/null
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportFactory.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.core.common.transport;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.net.HostAndPort;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.sentry.core.common.exception.MissingConfigurationException;
+import org.apache.sentry.core.common.utils.ThriftUtil;
+import org.apache.thrift.transport.TSaslClientTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslException;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Collections;
+
+/**
+ * Generate Thrift transports suitable for talking to Sentry
+ */
+public final class SentryTransportFactory {
+  private static final Logger LOGGER = LoggerFactory.getLogger(SentryTransportFactory.class);
+
+  private final Configuration conf;
+  private final SentryClientTransportConfigInterface transportConfig;
+  private final ArrayList<InetSocketAddress> endpoints;
+
+  public SentryTransportFactory(Configuration conf,
+                                SentryClientTransportConfigInterface configInterface) {
+    this.conf = conf;
+    this.transportConfig = configInterface;
+    String hostsAndPortsStr = transportConfig.getSentryServerRpcAddress(conf);
+    int serverPort = transportConfig.getServerRpcPort(conf);
+
+    String[] hostsAndPortsStrArr = hostsAndPortsStr.split(",");
+    HostAndPort[] hostsAndPorts = ThriftUtil.parseHostPortStrings(hostsAndPortsStrArr, serverPort);
+    this.endpoints = new ArrayList<>(hostsAndPortsStrArr.length);
+    for (HostAndPort endpoint : hostsAndPorts) {
+      this.endpoints.add(
+              new InetSocketAddress(endpoint.getHostText(), endpoint.getPort()));
+      LOGGER.debug("Added server endpoint: " + endpoint.toString());
+    }
+    // Reorder endpoints randomly to prevent all clients connecting to the same endpoint
+    // at the same time after a node failure.
+    if (endpoints.size() > 1) {
+      Collections.shuffle(endpoints);
+    }
+  }
+
+  /**
+   * This transport wraps the Sasl transports to set up the right UGI context for open().
+   */
+  private static final class UgiSaslClientTransport extends TSaslClientTransport {
+    private static final ImmutableMap<String, String> SASL_PROPERTIES =
+            ImmutableMap.of(Sasl.SERVER_AUTH, "true", Sasl.QOP, "auth-conf");
+
+    private UserGroupInformation ugi = null;
+
+    private UgiSaslClientTransport(String mechanism, String protocol,
+                                   String serverName, TTransport transport,
+                                   boolean wrapUgi, Configuration conf)
+            throws IOException, SaslException {
+      super(mechanism, null, protocol, serverName, SASL_PROPERTIES, null,
+              transport);
+      if (wrapUgi) {
+        // If we don't set the configuration, the UGI will be created based on
+        // what's on the classpath, which may lack the kerberos changes we require
+        UserGroupInformation.setConfiguration(conf);
+        ugi = UserGroupInformation.getLoginUser();
+      }
+    }
+
+    // open the SASL transport with using the current UserGroupInformation
+    // This is needed to get the current login context stored
+    @Override
+    public void open() throws TTransportException {
+      if (ugi == null) {
+        baseOpen();
+      } else {
+        try {
+          if (ugi.isFromKeytab()) {
+            ugi.checkTGTAndReloginFromKeytab();
+          }
+          ugi.doAs(new PrivilegedExceptionAction<Void>() {
+            public Void run() throws TTransportException {
+              baseOpen();
+              return null;
+            }
+          });
+        } catch (IOException e) {
+          throw new TTransportException("Failed to open SASL transport: " + e.getMessage(), e);
+        } catch (InterruptedException e) {
+          throw new TTransportException(
+                  "Interrupted while opening underlying transport: " + e.getMessage(), e);
+        }
+      }
+    }
+
+    private void baseOpen() throws TTransportException {
+      super.open();
+    }
+  }
+
+  /**
+   * On connection error, Iterates through all the configured servers and tries to connect.
+   * On successful connection, control returns
+   * On connection failure, continues iterating through all the configured sentry servers,
+   * and then retries the whole server list no more than connectionFullRetryTotal times.
+   * In this case, it won't introduce more latency when some server fails.
+   * <p>
+   * TODO: Add metrics for the number of successful connects and errors per client, and total number of retries.
+   */
+  public TTransport connect() throws IOException {
+    int connectionFullRetryTotal = transportConfig.getSentryFullRetryTotal(conf);
+    IOException currentException = null;
+    for (int retryCount = 0; retryCount < connectionFullRetryTotal; retryCount++) {
+      try {
+        return connectToAvailableServer();
+      } catch (IOException e) {
+        currentException = e;
+        LOGGER.error(
+                String.format("Failed to connect to all the configured sentry servers, " +
+                        "Retrying again"));
+      }
+    }
+    // Throw exception as reaching the max full connectWithRetry number.
+    LOGGER.error(
+            String.format("Reach the max connection retry num %d ", connectionFullRetryTotal),
+            currentException);
+      throw currentException;
+  }
+
+  /**
+   * Iterates through all the configured servers and tries to connect.
+   * On connection error, tries to connect to next server.
+   * Control returns on successful connection OR it's done trying to all the
+   * configured servers.
+   *
+   * @throws IOException
+   */
+  private TTransport connectToAvailableServer() throws IOException {
+    IOException currentException = null;
+    for (InetSocketAddress addr : endpoints) {
+      try {
+        return connect(addr);
+      } catch (IOException e) {
+        LOGGER.error(String.format("Failed connection to %s: %s",
+                addr.toString(), e.getMessage()), e);
+        currentException = e;
+      }
+    }
+    if (currentException != null) {
+      throw currentException;
+    }
+    return null;
+  }
+
+  /**
+   * Connect to the specified socket address and throw IOException if failed.
+   *
+   * @param serverAddress Address client needs to connect
+   * @throws Exception if there is failure in establishing the connection.
+   */
+  protected TTransport connect(InetSocketAddress serverAddress) throws IOException {
+    try {
+      TTransport transport = createTransport(serverAddress);
+      transport.open();
+      LOGGER.info(String.format("Connected to SentryServer: %s", serverAddress));
+      return transport;
+    } catch (TTransportException e) {
+      throw new IOException("Failed to open transport: " + e.getMessage(), e);
+    } catch (MissingConfigurationException e) {
+      throw new RuntimeException(e.getMessage(), e);
+    }
+  }
+
+  /**
+   * New socket is is created
+   *
+   * @param serverAddress
+   * @return
+   * @throws TTransportException
+   * @throws MissingConfigurationException
+   * @throws IOException
+   */
+  private TTransport createTransport(InetSocketAddress serverAddress)
+          throws TTransportException, MissingConfigurationException, IOException {
+    TTransport socket = new TSocket(serverAddress.getHostName(),
+            serverAddress.getPort(), transportConfig.getServerRpcConnTimeoutInMs(conf));
+
+    if (!transportConfig.isKerberosEnabled(conf)) {
+      return socket;
+    }
+
+    String serverPrincipal = transportConfig.getSentryPrincipal(conf);
+    serverPrincipal = SecurityUtil.getServerPrincipal(serverPrincipal, serverAddress.getAddress());
+    LOGGER.debug("Using server kerberos principal: " + serverPrincipal);
+    String[] serverPrincipalParts = SaslRpcServer.splitKerberosName(serverPrincipal);
+    Preconditions.checkArgument(serverPrincipalParts.length == 3,
+            "Kerberos principal should have 3 parts: " + serverPrincipal);
+
+    boolean wrapUgi = transportConfig.useUserGroupInformation(conf);
+    return new UgiSaslClientTransport(SaslRpcServer.AuthMethod.KERBEROS.getMechanismName(),
+            serverPrincipalParts[0], serverPrincipalParts[1],
+            socket, wrapUgi, conf);
+  }
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java
index faac053..11f6894 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java
@@ -18,9 +18,8 @@
 package org.apache.sentry.hdfs;
 
 import org.apache.sentry.core.common.exception.SentryHdfsServiceException;
-import org.apache.sentry.core.common.transport.SentryServiceClient;
 
-public interface SentryHDFSServiceClient extends SentryServiceClient {
+public interface SentryHDFSServiceClient {
   String SENTRY_HDFS_SERVICE_NAME = "SentryHDFSService";
 
   void notifyHMSUpdate(PathsUpdate update)
@@ -30,5 +29,7 @@ public interface SentryHDFSServiceClient extends SentryServiceClient {
 
   SentryAuthzUpdate getAllUpdatesFrom(long permSeqNum, long pathSeqNum)
       throws SentryHdfsServiceException;
+
+  void close();
 }
 

http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java
index d337319..794aded 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java
@@ -18,12 +18,13 @@
 package org.apache.sentry.hdfs;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.util.LinkedList;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.sentry.core.common.exception.SentryHdfsServiceException;
-import org.apache.sentry.core.common.transport.SentryServiceClientTransportDefaultImpl;
+import org.apache.sentry.core.common.transport.SentryClientTransportConfigInterface;
+import org.apache.sentry.core.common.transport.SentrySocket;
+import org.apache.sentry.core.common.transport.SentryTransportFactory;
 import org.apache.sentry.hdfs.service.thrift.SentryHDFSService;
 import org.apache.sentry.hdfs.service.thrift.SentryHDFSService.Client;
 import org.apache.sentry.hdfs.service.thrift.TAuthzUpdateResponse;
@@ -34,6 +35,7 @@ import org.apache.thrift.protocol.TCompactProtocol;
 import org.apache.thrift.protocol.TMultiplexedProtocol;
 import org.apache.thrift.protocol.TProtocol;
 
+import org.apache.thrift.transport.TTransport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,28 +49,41 @@ import org.slf4j.LoggerFactory;
 */
 
 
-public class SentryHDFSServiceClientDefaultImpl extends SentryServiceClientTransportDefaultImpl implements SentryHDFSServiceClient {
+public class SentryHDFSServiceClientDefaultImpl
+        implements SentryHDFSServiceClient, SentrySocket {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(SentryHDFSServiceClientDefaultImpl.class);
+  private final Configuration conf;
 
   private Client client;
+  private SentryTransportFactory transportFactory;
+  private TTransport transport;
 
-  public SentryHDFSServiceClientDefaultImpl(Configuration conf) throws IOException {
-    super(conf, sentryClientType.HDFS_CLIENT);
+
+
+  SentryHDFSServiceClientDefaultImpl(Configuration conf,
+                                     SentryClientTransportConfigInterface transportConfig)
+          throws IOException {
+    this.conf = conf;
+    transportFactory = new SentryTransportFactory(conf, transportConfig);
   }
 
   /**
    * Connect to the specified socket address and then use the new socket
    * to construct new thrift client.
    *
-   * @param serverAddress: socket address to which the client should connect.
    * @throws IOException
    */
-  public void connect(InetSocketAddress serverAddress) throws IOException {
-    TProtocol tProtocol = null;
-    super.connect(serverAddress);
+  @Override
+  public void connect() throws IOException {
+    if (isOpen()) {
+      return;
+    }
+
+    transport = transportFactory.connect();
     long maxMessageSize = conf.getLong(ServiceConstants.ClientConfig.SENTRY_HDFS_THRIFT_MAX_MESSAGE_SIZE,
             ServiceConstants.ClientConfig.SENTRY_HDFS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT);
+    TProtocol tProtocol = null;
     if (conf.getBoolean(ServiceConstants.ClientConfig.USE_COMPACT_TRANSPORT,
             ServiceConstants.ClientConfig.USE_COMPACT_TRANSPORT_DEFAULT)) {
       tProtocol = new TCompactProtocol(transport, maxMessageSize, maxMessageSize);
@@ -119,4 +134,14 @@ public class SentryHDFSServiceClientDefaultImpl extends SentryServiceClientTrans
     }
     return retVal;
   }
+
+  @Override
+  public void close() {
+    transport.close();
+    transport = null;
+  }
+
+  private boolean isOpen() {
+    return ((transport != null) && transport.isOpen());
+  }
 }

http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientFactory.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientFactory.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientFactory.java
index 59ac360..174da4f 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientFactory.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientFactory.java
@@ -21,12 +21,16 @@ import java.lang.reflect.Proxy;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.sentry.core.common.transport.RetryClientInvocationHandler;
+import org.apache.sentry.core.common.transport.SentryClientTransportConfigInterface;
+import org.apache.sentry.core.common.transport.SentryHDFSClientTransportConfig;
 
 /**
  * Client factory to create normal client or proxy with HA invocation handler
  */
 public class SentryHDFSServiceClientFactory {
-    
+  private static final SentryClientTransportConfigInterface transportConfig =
+          new SentryHDFSClientTransportConfig();
+
   private SentryHDFSServiceClientFactory() {
     // Make constructor private to avoid instantiation
   }
@@ -36,7 +40,8 @@ public class SentryHDFSServiceClientFactory {
     return (SentryHDFSServiceClient) Proxy
       .newProxyInstance(SentryHDFSServiceClientDefaultImpl.class.getClassLoader(),
         SentryHDFSServiceClientDefaultImpl.class.getInterfaces(),
-        new RetryClientInvocationHandler(conf,
-          new SentryHDFSServiceClientDefaultImpl(conf)));
+        new RetryClientInvocationHandler(conf, transportConfig,
+          new SentryHDFSServiceClientDefaultImpl(conf,
+                  transportConfig)));
   }
 }

http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClient.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClient.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClient.java
index c832706..11cdee7 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClient.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClient.java
@@ -24,9 +24,8 @@ import java.util.Set;
 import org.apache.sentry.core.common.exception.SentryUserException;
 import org.apache.sentry.core.common.ActiveRoleSet;
 import org.apache.sentry.core.common.Authorizable;
-import org.apache.sentry.core.common.transport.SentryServiceClient;
 
-public interface SentryGenericServiceClient extends SentryServiceClient {
+public interface SentryGenericServiceClient {
 
   /**
    * Create a sentry role
@@ -192,4 +191,6 @@ public interface SentryGenericServiceClient extends SentryServiceClient {
   Map<String, TSentryPrivilegeMap> listPrivilegsbyAuthorizable(String component,
       String serviceName, String requestorUserName, Set<String> authorizablesSet,
       Set<String> groups, ActiveRoleSet roleSet) throws SentryUserException;
+
+  void close();
 }

http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientDefaultImpl.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientDefaultImpl.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientDefaultImpl.java
index 9bbd736..c9d0357 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientDefaultImpl.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientDefaultImpl.java
@@ -18,18 +18,16 @@
 package org.apache.sentry.provider.db.generic.service.thrift;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.util.*;
 
 import org.apache.hadoop.conf.Configuration;
 
-import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
-
 import org.apache.sentry.core.common.exception.SentryUserException;
 import org.apache.sentry.core.common.ActiveRoleSet;
 import org.apache.sentry.core.common.Authorizable;
-import org.apache.sentry.core.common.transport.SentryServiceClientTransportDefaultImpl;
-import org.apache.sentry.core.common.utils.SentryConstants;
+import org.apache.sentry.core.common.transport.SentryClientTransportConfigInterface;
+import org.apache.sentry.core.common.transport.SentrySocket;
+import org.apache.sentry.core.common.transport.SentryTransportFactory;
 import org.apache.sentry.core.model.db.AccessConstants;
 import org.apache.sentry.service.thrift.ServiceConstants;
 import org.apache.sentry.service.thrift.Status;
@@ -38,6 +36,7 @@ import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TMultiplexedProtocol;
 
+import org.apache.thrift.transport.TTransport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,30 +51,48 @@ import com.google.common.collect.Lists;
  So it is important to close and re-open the transport so that new socket is used.
  */
 
-public class SentryGenericServiceClientDefaultImpl extends SentryServiceClientTransportDefaultImpl implements SentryGenericServiceClient {
+public class SentryGenericServiceClientDefaultImpl
+        implements SentryGenericServiceClient, SentrySocket {
+  private final SentryTransportFactory transportFactory;
+  private final Configuration conf;
+  private TTransport transport;
+
+
   private SentryGenericPolicyService.Client client;
   private static final Logger LOGGER = LoggerFactory
     .getLogger(SentryGenericServiceClientDefaultImpl.class);
   private static final String THRIFT_EXCEPTION_MESSAGE = "Thrift exception occured ";
 
-  public SentryGenericServiceClientDefaultImpl(Configuration conf) throws IOException {
-    super(conf, sentryClientType.POLICY_CLIENT);
+  public SentryGenericServiceClientDefaultImpl(Configuration conf,
+                                               SentryClientTransportConfigInterface transportConfig)
+          throws IOException {
+    this.conf = conf;
+    transportFactory = new SentryTransportFactory(conf, transportConfig);
+
+    // TODO - do it correctly
+    /*
     if (kerberos) {
       // since the client uses hadoop-auth, we need to set kerberos in
       // hadoop-auth if we plan to use kerberos
       conf.set(HADOOP_SECURITY_AUTHENTICATION, SentryConstants.KERBEROS_MODE);
     }
+    */
   }
 
   /**
    * Connect to the specified socket address and then use the new socket
    * to construct new thrift client.
    *
-   * @param serverAddress: socket address to which the client should connect.
    * @throws IOException
    */
-  public void connect(InetSocketAddress serverAddress) throws IOException {
-    super.connect(serverAddress);
+  @Override
+  public void connect() throws IOException {
+    if (isOpen()) {
+      return;
+    }
+
+    transport = transportFactory.connect();
+
     long maxMessageSize = conf.getLong(ServiceConstants.ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE,
             ServiceConstants.ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE_DEFAULT);
     TMultiplexedProtocol protocol = new TMultiplexedProtocol(
@@ -84,6 +101,12 @@ public class SentryGenericServiceClientDefaultImpl extends SentryServiceClientTr
     client = new SentryGenericPolicyService.Client(protocol);
     LOGGER.debug("Successfully created client");
   }
+
+  private boolean isOpen() {
+    return ((transport != null) && transport.isOpen());
+  }
+
+
   /**
    * Create a sentry role
    *
@@ -506,4 +529,9 @@ public class SentryGenericServiceClientDefaultImpl extends SentryServiceClientTr
       throw new SentryUserException(THRIFT_EXCEPTION_MESSAGE, e);
     }
   }
+
+  @Override
+  public void close() {
+    transport.close();
+  }
 }

http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientFactory.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientFactory.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientFactory.java
index 1c582f0..9132449 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientFactory.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientFactory.java
@@ -19,6 +19,8 @@ package org.apache.sentry.provider.db.generic.service.thrift;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.sentry.core.common.transport.RetryClientInvocationHandler;
+import org.apache.sentry.core.common.transport.SentryClientTransportConfigInterface;
+import org.apache.sentry.core.common.transport.SentryPolicyClientTransportConfig;
 
 import java.lang.reflect.Proxy;
 
@@ -26,6 +28,8 @@ import java.lang.reflect.Proxy;
  * SentryGenericServiceClientFactory is a public class for the components which using Generic Model to create sentry client.
  */
 public final class SentryGenericServiceClientFactory {
+  private static final SentryClientTransportConfigInterface transportConfig =
+          new SentryPolicyClientTransportConfig();
 
   private SentryGenericServiceClientFactory() {
   }
@@ -34,8 +38,8 @@ public final class SentryGenericServiceClientFactory {
     return (SentryGenericServiceClient) Proxy
       .newProxyInstance(SentryGenericServiceClientDefaultImpl.class.getClassLoader(),
         SentryGenericServiceClientDefaultImpl.class.getInterfaces(),
-        new RetryClientInvocationHandler(conf,
-          new SentryGenericServiceClientDefaultImpl(conf)));
+        new RetryClientInvocationHandler(conf, transportConfig,
+          new SentryGenericServiceClientDefaultImpl(conf, transportConfig)));
   }
     
 }

http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClient.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClient.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClient.java
index 28c3e35..3b25db7 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClient.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClient.java
@@ -25,9 +25,8 @@ import java.util.Set;
 import org.apache.sentry.core.common.exception.SentryUserException;
 import org.apache.sentry.core.common.ActiveRoleSet;
 import org.apache.sentry.core.common.Authorizable;
-import org.apache.sentry.core.common.transport.SentryServiceClient;
 
-public interface SentryPolicyServiceClient extends SentryServiceClient {
+public interface SentryPolicyServiceClient  {
 
   void createRole(String requestorUserName, String roleName) throws SentryUserException;
 
@@ -216,4 +215,6 @@ public interface SentryPolicyServiceClient extends SentryServiceClient {
   // export the sentry mapping data with map structure
   Map<String, Map<String, Set<String>>> exportPolicy(String requestorUserName, String objectPath)
       throws SentryUserException;
+
+  void close();
 }

http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java
index b4c1a5f..9eb60cc 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java
@@ -19,7 +19,6 @@
 package org.apache.sentry.provider.db.service.thrift;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -31,6 +30,9 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.sentry.core.common.exception.SentryUserException;
 import org.apache.sentry.core.common.ActiveRoleSet;
 import org.apache.sentry.core.common.Authorizable;
+import org.apache.sentry.core.common.transport.SentryClientTransportConfigInterface;
+import org.apache.sentry.core.common.transport.SentrySocket;
+import org.apache.sentry.core.common.transport.SentryTransportFactory;
 import org.apache.sentry.core.model.db.AccessConstants;
 import org.apache.sentry.core.model.db.DBModelAuthorizable;
 import org.apache.sentry.core.common.utils.PolicyFileConstants;
@@ -42,7 +44,7 @@ import org.apache.sentry.service.thrift.Status;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TMultiplexedProtocol;
-import org.apache.sentry.core.common.transport.SentryServiceClientTransportDefaultImpl;
+import org.apache.thrift.transport.TTransport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -65,35 +67,43 @@ import com.google.common.collect.Sets;
  server this is configured.
 */
 
-public class SentryPolicyServiceClientDefaultImpl extends SentryServiceClientTransportDefaultImpl implements SentryPolicyServiceClient {
+public class SentryPolicyServiceClientDefaultImpl
+        implements SentryPolicyServiceClient, SentrySocket {
+
+  private final Configuration conf;
 
   private SentryPolicyService.Client client;
   private static final Logger LOGGER = LoggerFactory
           .getLogger(SentryPolicyServiceClient.class);
+  private SentryTransportFactory transportFactory;
+  private TTransport transport;
+
+
   private static final String THRIFT_EXCEPTION_MESSAGE = "Thrift exception occurred ";
 
   /**
    * Initialize the sentry configurations.
    */
-  public SentryPolicyServiceClientDefaultImpl(Configuration conf)
+  public SentryPolicyServiceClientDefaultImpl(Configuration conf,
+                                              SentryClientTransportConfigInterface transportConfig)
           throws IOException {
-    super(conf, sentryClientType.POLICY_CLIENT);
-  }
-
-  public SentryPolicyServiceClientDefaultImpl(String addr, int port,
-                                              Configuration conf) throws IOException {
-    super(addr, port, conf, sentryClientType.POLICY_CLIENT);
+    this.conf = conf;
+    this.transportFactory = new SentryTransportFactory(conf, transportConfig);
   }
 
   /**
    * Connect to the specified socket address and then use the new socket
    * to construct new thrift client.
    *
-   * @param serverAddress: socket address to which the client should connect.
    * @throws IOException
    */
-  public void connect(InetSocketAddress serverAddress) throws IOException {
-    super.connect(serverAddress);
+  @Override
+  public void connect() throws IOException {
+    if (isOpen()) {
+      return;
+    }
+    transport = transportFactory.connect();
+
     long maxMessageSize = conf.getLong(
             ServiceConstants.ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE,
             ServiceConstants.ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE_DEFAULT);
@@ -1008,4 +1018,14 @@ public class SentryPolicyServiceClientDefaultImpl extends SentryServiceClientTra
     }
     return rolePrivilegesMapForFile;
   }
+
+  @Override
+  public void close() {
+    transport.close();
+    transport = null;
+  }
+
+  private boolean isOpen() {
+    return ((transport != null) && transport.isOpen());
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java
index 745dc4c..55c51d3 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java
@@ -23,29 +23,24 @@ import java.lang.reflect.Proxy;
 import org.apache.hadoop.conf.Configuration;
 
 import org.apache.sentry.core.common.transport.RetryClientInvocationHandler;
+import org.apache.sentry.core.common.transport.SentryClientTransportConfigInterface;
+import org.apache.sentry.core.common.transport.SentryPolicyClientTransportConfig;
 import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClient;
 import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClientDefaultImpl;
-import org.apache.sentry.service.thrift.ServiceConstants.ClientConfig;
 
 public final class SentryServiceClientFactory {
+  private static final SentryClientTransportConfigInterface transportConfig =
+          new SentryPolicyClientTransportConfig();
+
 
   private SentryServiceClientFactory() {
   }
 
   public static SentryPolicyServiceClient create(Configuration conf) throws Exception {
-    boolean pooled = conf.getBoolean(
-        ClientConfig.SENTRY_POOL_ENABLED, ClientConfig.SENTRY_POOL_ENABLED_DEFAULT);
-    if (pooled) {
-      return (SentryPolicyServiceClient) Proxy
-          .newProxyInstance(SentryPolicyServiceClientDefaultImpl.class.getClassLoader(),
-              SentryPolicyServiceClientDefaultImpl.class.getInterfaces(),
-              new PoolClientInvocationHandler(conf));
-    } else {
-      return (SentryPolicyServiceClient) Proxy
-          .newProxyInstance(SentryPolicyServiceClientDefaultImpl.class.getClassLoader(),
-              SentryPolicyServiceClientDefaultImpl.class.getInterfaces(),
-            new RetryClientInvocationHandler(conf,
-              new SentryPolicyServiceClientDefaultImpl(conf)));
-    }
+    return (SentryPolicyServiceClient) Proxy
+        .newProxyInstance(SentryPolicyServiceClientDefaultImpl.class.getClassLoader(),
+            SentryPolicyServiceClientDefaultImpl.class.getInterfaces(),
+          new RetryClientInvocationHandler(conf, transportConfig,
+            new SentryPolicyServiceClientDefaultImpl(conf, transportConfig)));
   }
 }

http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientPoolFactory.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientPoolFactory.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientPoolFactory.java
index 0164fa6..dd13e0d 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientPoolFactory.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientPoolFactory.java
@@ -23,9 +23,9 @@ import org.apache.commons.pool2.PooledObject;
 import org.apache.commons.pool2.impl.DefaultPooledObject;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClient;
-import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClientDefaultImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
 
 /**
  * SentryServiceClientPoolFactory is for connection pool to manage the object. Implement the related
@@ -36,21 +36,21 @@ public class SentryServiceClientPoolFactory extends BasePooledObjectFactory<Sent
 
   private static final Logger LOGGER = LoggerFactory.getLogger(SentryServiceClientPoolFactory.class);
 
-  private final String addr;
-  private final int port;
-  private final Configuration conf;
+  //private final String addr;
+  //private final int port;
+  //private final Configuration conf;
 
   public SentryServiceClientPoolFactory(String addr, int port,
                                         Configuration conf) {
-    this.addr = addr;
-    this.port = port;
-    this.conf = conf;
+    LOGGER.debug("addr = " + addr + "port = " + String.valueOf(port) + " conf = ", conf.toString());
+    //this.addr = addr;
+    //this.port = port;
+    //this.conf = conf;
   }
 
   @Override
   public SentryPolicyServiceClient create() throws Exception {
-    LOGGER.debug("Creating Sentry Service Client...");
-    return new SentryPolicyServiceClientDefaultImpl(addr, port, conf);
+    throw new NotImplementedException();
   }
 
   @Override
@@ -60,13 +60,6 @@ public class SentryServiceClientPoolFactory extends BasePooledObjectFactory<Sent
 
   @Override
   public void destroyObject(PooledObject<SentryPolicyServiceClient> pooledObject) {
-    SentryPolicyServiceClient client = pooledObject.getObject();
-    LOGGER.debug("Destroying Sentry Service Client: " + client);
-    if (client != null) {
-      // The close() of TSocket or TSaslClientTransport is called actually, and there has no
-      // exception even there has some problems, eg, the client is closed already.
-      // The close here is just try to close the socket and the client will be destroyed soon.
-      client.close();
-    }
+    throw new NotImplementedException();
   }
 }


Mime
View raw message