sentry-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sra...@apache.org
Subject [1/2] sentry git commit: Sentry-1321: Implement HMSFollower in Sentry service which reads the NotificationLog entries (Sravya Tirukkovalur, Reviewed by: Hao Hao) (Append)
Date Tue, 16 Aug 2016 00:26:08 GMT
Repository: sentry
Updated Branches:
  refs/heads/sentry-ha-redesign 2d5ed9984 -> 908072d66


http://git-wip-us.apache.org/repos/asf/sentry/blob/908072d6/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/PoolClientInvocationHandler.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/PoolClientInvocationHandler.java
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/PoolClientInvocationHandler.java
index a35bf1d..353d461 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/PoolClientInvocationHandler.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/PoolClientInvocationHandler.java
@@ -20,11 +20,13 @@ package org.apache.sentry.service.thrift;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.pool2.PooledObjectFactory;
 import org.apache.commons.pool2.impl.AbandonedConfig;
 import org.apache.commons.pool2.impl.GenericObjectPool;
 import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.sentry.core.common.exception.SentryStandbyException;
 import org.apache.sentry.core.common.exception.SentryUserException;
 import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClient;
 import org.apache.sentry.service.thrift.ServiceConstants.ClientConfig;
@@ -33,63 +35,273 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * The PoolClientInvocationHandler is a proxy class for handling thrift call. For every thrift
call,
- * get the instance of SentryPolicyServiceBaseClient from the commons-pool, and return the
instance
- * to the commons-pool after complete the call. For any exception with the call, discard
the
- * instance and create a new one added to the commons-pool. Then, get the instance and do
the call
- * again. For the thread safe, the commons-pool will manage the connection pool, and every
thread
- * can get the connection by borrowObject() and return the connection to the pool by returnObject().
+ * The PoolClientInvocationHandler is a proxy class for handling thrift
+ * call. For every thrift call, get the instance of
+ * SentryPolicyServiceBaseClient from the commons-pool, and return the instance
+ * to the commons-pool after complete the call. For any exception with the call,
+ * discard the instance and create a new one added to the commons-pool. Then,
+ * get the instance and do the call again. For the thread safe, the commons-pool
+ * will manage the connection pool, and every thread can get the connection by
+ * borrowObject() and return the connection to the pool by returnObject().
  */
 
 public class PoolClientInvocationHandler extends SentryClientInvocationHandler {
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(PoolClientInvocationHandler.class);
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(PoolClientInvocationHandler.class);
+  private static final String POOL_EXCEPTION_MESSAGE = "Pool exception occurred ";
 
   private final Configuration conf;
-  private PooledObjectFactory<SentryPolicyServiceClient> poolFactory;
-  private GenericObjectPool<SentryPolicyServiceClient> pool;
-  private GenericObjectPoolConfig poolConfig;
-  private int connectionRetryTotal;
 
-  private static final String POOL_EXCEPTION_MESSAGE = "Pool exception occured ";
+  /**
+   * The configuration to use for our object pools.
+   * Null if we are not using object pools.
+   */
+  private final GenericObjectPoolConfig poolConfig;
+
+  /**
+   * The total number of connection retries to attempt per endpoint.
+   */
+  private final int connectionRetryTotal;
+
+  /**
+   * The configured sentry servers.
+   */
+  private final Endpoint[] endpoints;
+
+  /**
+   * The endpoint which we are currently using.  This can be read without any locks.
+   * It must be written while holding the endpoints lock.
+   */
+  private volatile int freshestEndpointIdx = 0;
+
+  private class Endpoint {
+    /**
+     * The server address or hostname.
+     */
+    private final String addr;
+
+    /**
+     * The server port.
+     */
+    private final int port;
+
+    /**
+     * The server's poolFactory used to create new clients.
+     */
+    private final PooledObjectFactory<SentryPolicyServiceClient> poolFactory;
+
+    /**
+     * The server's pool of cached clients.
+     */
+    private final GenericObjectPool<SentryPolicyServiceClient> pool;
+
+    Endpoint(String addr, int port) {
+      this.addr = addr;
+      this.port = port;
+      this.poolFactory = new SentryServiceClientPoolFactory(addr, port, conf);
+      this.pool = new GenericObjectPool<SentryPolicyServiceClient>(
+          this.poolFactory, poolConfig, new AbandonedConfig());
+    }
+
+    GenericObjectPool<SentryPolicyServiceClient> getPool() {
+      return pool;
+    }
+
+    String getEndPointStr() {
+      return new String("endpoint at [address " + addr + ", port " + port + "]");
+    }
+  }
 
   public PoolClientInvocationHandler(Configuration conf) throws Exception {
     this.conf = conf;
-    readConfiguration();
-    poolFactory = new SentryServiceClientPoolFactory(conf);
-    pool = new GenericObjectPool<SentryPolicyServiceClient>(poolFactory, poolConfig,
new AbandonedConfig());
+
+    this.poolConfig = new GenericObjectPoolConfig();
+    // config the pool size for commons-pool
+    this.poolConfig.setMaxTotal(conf.getInt(ClientConfig.SENTRY_POOL_MAX_TOTAL,
+        ClientConfig.SENTRY_POOL_MAX_TOTAL_DEFAULT));
+    this.poolConfig.setMinIdle(conf.getInt(ClientConfig.SENTRY_POOL_MIN_IDLE,
+        ClientConfig.SENTRY_POOL_MIN_IDLE_DEFAULT));
+    this.poolConfig.setMaxIdle(conf.getInt(ClientConfig.SENTRY_POOL_MAX_IDLE,
+        ClientConfig.SENTRY_POOL_MAX_IDLE_DEFAULT));
+
+    // get the retry number for reconnecting service
+    this.connectionRetryTotal = conf.getInt(ClientConfig.SENTRY_POOL_RETRY_TOTAL,
+        ClientConfig.SENTRY_POOL_RETRY_TOTAL_DEFAULT);
+
+    String hostsAndPortsStr = conf.get(ClientConfig.SERVER_RPC_ADDRESS);
+    if (hostsAndPortsStr == null) {
+      throw new RuntimeException("Config key " +
+          ClientConfig.SERVER_RPC_ADDRESS + " is required");
+    }
+    int defaultPort = conf.getInt(ClientConfig.SERVER_RPC_PORT,
+        ClientConfig.SERVER_RPC_PORT_DEFAULT);
+    String[] hostsAndPorts = hostsAndPortsStr.split(",");
+    String[] hosts = new String[hostsAndPorts.length];
+    int[] ports = new int[hostsAndPorts.length];
+    parseHostPortStrings(hostsAndPortsStr, hostsAndPorts, hosts,
+        ports, defaultPort);
+    this.endpoints = new Endpoint[hostsAndPorts.length];
+    for (int i = 0; i < this.endpoints.length; i++) {
+      this.endpoints[i] = new Endpoint(hosts[i], ports[i]);
+      LOGGER.info("Initiate sentry sever endpoint: hostname " + hosts[i] + ", port " + ports[i]);
+    }
+  }
+
+  @VisibleForTesting
+  /**
+   * Utility function for parsing host and port strings. Expected form should be
+   * (host:port). The hostname could be in ipv6 style. Port number can be empty
+   * and will be default to defaultPort.
+   */
+  static protected void parseHostPortStrings(String hostsAndPortsStr,
+        String[] hostsAndPorts, String[] hosts, int[] ports,
+        int defaultPort) {
+    int i = -1;
+    for (String hostAndPort: hostsAndPorts) {
+      i++;
+      hostAndPort = hostAndPort.trim();
+      if (hostAndPort.isEmpty()) {
+        throw new RuntimeException("Cannot handle empty server RPC address " +
+            "in component " + (i + 1) + " of " + hostsAndPortsStr);
+      }
+      int colonIdx = hostAndPort.lastIndexOf(":");
+      if (colonIdx == -1) {
+        // There is no colon in the host+port string.
+        // That means the port is left unspecified, and should be set to
+        // the default.
+        hosts[i] = hostAndPort;
+        ports[i] = defaultPort;
+        continue;
+      }
+      int rightBracketIdx = hostAndPort.indexOf(']', colonIdx);
+      if (rightBracketIdx != -1) {
+        // If there is a right bracket that occurs after the colon, the
+        // colon we found is part of an ipv6 address like this:
+        // [::1].  That means we only have the host part, not the port part.
+        hosts[i] = hostAndPort.substring(0, rightBracketIdx);
+        ports[i] = defaultPort;
+        continue;
+      }
+      // We have a host:port string, where the part after colon should be
+      // the port.
+      hosts[i] = hostAndPort.substring(0, colonIdx);
+      String portStr = hostAndPort.substring(colonIdx+1);
+      try {
+        ports[i] = Integer.valueOf(portStr);
+      } catch (NumberFormatException e) {
+        throw new RuntimeException("Cannot parse port string " + portStr +
+            "in component " + (i + 1) + " of " + hostsAndPortsStr);
+      }
+      if ((ports[i] < 0) || (ports[i] > 65535)) {
+        throw new RuntimeException("Invalid port number given for " + portStr +
+            "in component " + (i + 1) + " of " + hostsAndPortsStr);
+      }
+    }
+    // Strip the brackets off of hostnames and ip addresses enclosed in square
+    // brackets.  This is to support ipv6-style [address]:port addressing.
+    for (int j = 0; j < hosts.length; j++) {
+      if ((hosts[j].startsWith("[")) && (hosts[j].endsWith("]"))) {
+        hosts[j] = hosts[j].substring(1, hosts[j].length() - 1);
+      }
+    }
   }
 
   @Override
-  public Object invokeImpl(Object proxy, Method method, Object[] args) throws Exception {
+  public Object invokeImpl(Object proxy, Method method, Object[] args)
+      throws Exception {
     int retryCount = 0;
-    Object result = null;
-    while (retryCount < connectionRetryTotal) {
+    /**
+     * The maximum number of retries that we will do.  Each endpoint gets its
+     * own set of retries.
+     */
+    int retryLimit = connectionRetryTotal * endpoints.length;
+
+    /**
+     * The index of the endpoint to use.
+     */
+    int endpointIdx = freshestEndpointIdx;
+
+    /**
+     * A list of exceptions from each endpoint.  This starts as null to avoid
+     * memory allocation in the common case where there is no error.
+     */
+    Exception exc[] = null;
+
+    Object ret = null;
+
+    while (retryCount < retryLimit) {
+      GenericObjectPool<SentryPolicyServiceClient> pool =
+          endpoints[endpointIdx].getPool();
       try {
-        // The wapper here is for the retry of thrift call, the default retry number is 3.
-        result = invokeFromPool(method, args);
+        if ((exc != null) &&
+            (exc[endpointIdx] instanceof TTransportException)) {
+          // If there was a TTransportException last time we tried to contact
+          // this endpoint, attempt to create a new connection before we try
+          // again.
+          synchronized (endpoints) {
+            // If there has room, create new instance and add it to the
+            // commons-pool.  This instance will be returned first from the
+            // commons-pool, because the configuration is LIFO.
+            if (pool.getNumIdle() + pool.getNumActive() < pool.getMaxTotal()) {
+              pool.addObject();
+            }
+          }
+        }
+        // Try to make the RPC.
+        ret = invokeFromPool(method, args, pool);
         break;
-      } catch (TTransportException e) {
-        // TTransportException means there has connection problem, create a new connection
and try
-        // again. Get the lock of pool and add new connection.
-        synchronized (pool) {
-          // If there has room, create new instance and add it to the commons-pool, this
instance
-          // will be back first from the commons-pool because the configuration is LIFO.
-          if (pool.getNumIdle() + pool.getNumActive() < pool.getMaxTotal()) {
-            pool.addObject();
+      } catch (SentryStandbyException | TTransportException e) {
+        if (exc == null) {
+          exc = new Exception[endpoints.length];
+        }
+        exc[endpointIdx] = e;
+      }
+
+      Exception lastExc = exc[endpointIdx];
+      synchronized (endpoints) {
+        int curFreshestEndpointIdx = freshestEndpointIdx;
+        if (curFreshestEndpointIdx == endpointIdx) {
+          curFreshestEndpointIdx =
+              (curFreshestEndpointIdx  + 1) %  endpoints.length;
+          freshestEndpointIdx = curFreshestEndpointIdx;
+        }
+        endpointIdx = curFreshestEndpointIdx;
+      }
+      // Increase the retry num, and throw the exception if can't retry again.
+      retryCount++;
+      if (retryCount == connectionRetryTotal) {
+        boolean allStandby = true, allUnreachable = true;
+        for (int i = 0; i < exc.length; i++) {
+          if (exc[i] instanceof SentryStandbyException) {
+            allUnreachable = false;
+            LOGGER.error("Sentry server " + endpoints[endpointIdx].getEndPointStr()
+                + " is in standby mode");
+          } else {
+            allStandby = false;
+            LOGGER.error("Sentry server " + endpoints[endpointIdx].getEndPointStr()
+                + " is in unreachable.");
           }
         }
-        // Increase the retry num, and throw the exception if can't retry again.
-        retryCount++;
-        if (retryCount == connectionRetryTotal) {
-          throw new SentryUserException(e.getMessage(), e);
+        if (allStandby) {
+          throw new SentryStandbyException("All sentry servers are in " +
+              "standby mode.", lastExc);
+        } else if (allUnreachable) {
+          throw new SentryUserException("All sentry servers are unreachable. " +
+              "Diagnostics is needed for unreachable servers.",
+              lastExc);
+        } else {
+          throw new SentryUserException("All reachable servers are standby. " +
+              "Diagnostics is needed for unreachable servers.",
+              lastExc);
         }
       }
     }
-    return result;
+    return ret;
   }
 
-  private Object invokeFromPool(Method method, Object[] args) throws Exception {
+  private Object invokeFromPool(Method method, Object[] args,
+      GenericObjectPool<SentryPolicyServiceClient> pool) throws Exception {
     Object result = null;
     SentryPolicyServiceClient client;
     try {
@@ -106,7 +318,9 @@ public class PoolClientInvocationHandler extends SentryClientInvocationHandler
{
       // Get the target exception, check if SentryUserException or TTransportException is
wrapped.
       // TTransportException means there has connection problem with the pool.
       Throwable targetException = e.getCause();
-      if (targetException instanceof SentryUserException) {
+      if (targetException instanceof SentryStandbyException) {
+        throw (SentryStandbyException)targetException;
+      } else if (targetException instanceof SentryUserException) {
         Throwable sentryTargetException = targetException.getCause();
         // If there has connection problem, eg, invalid connection if the service restarted,
         // sentryTargetException instanceof TTransportException = true.
@@ -134,21 +348,12 @@ public class PoolClientInvocationHandler extends SentryClientInvocationHandler
{
 
   @Override
   public void close() {
-    try {
-      pool.close();
-    } catch (Exception e) {
-      LOGGER.debug(POOL_EXCEPTION_MESSAGE, e);
+    for (int i = 0; i < endpoints.length; i++) {
+      try {
+        endpoints[i].getPool().close();
+      } catch (Exception e) {
+        LOGGER.debug(POOL_EXCEPTION_MESSAGE, e);
+      }
     }
   }
-
-  private void readConfiguration() {
-    poolConfig = new GenericObjectPoolConfig();
-    // config the pool size for commons-pool
-    poolConfig.setMaxTotal(conf.getInt(ClientConfig.SENTRY_POOL_MAX_TOTAL, ClientConfig.SENTRY_POOL_MAX_TOTAL_DEFAULT));
-    poolConfig.setMinIdle(conf.getInt(ClientConfig.SENTRY_POOL_MIN_IDLE, ClientConfig.SENTRY_POOL_MIN_IDLE_DEFAULT));
-    poolConfig.setMaxIdle(conf.getInt(ClientConfig.SENTRY_POOL_MAX_IDLE, ClientConfig.SENTRY_POOL_MAX_IDLE_DEFAULT));
-    // get the retry number for reconnecting service
-    connectionRetryTotal = conf.getInt(ClientConfig.SENTRY_POOL_RETRY_TOTAL,
-        ClientConfig.SENTRY_POOL_RETRY_TOTAL_DEFAULT);
-  }
 }

http://git-wip-us.apache.org/repos/asf/sentry/blob/908072d6/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java
index 56d774b..9e90af8 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.conf.Configuration;
 
 import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClient;
 import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClientDefaultImpl;
-import org.apache.sentry.service.thrift.ServiceConstants.ClientConfig;
 
 public final class SentryServiceClientFactory {
 
@@ -32,15 +31,9 @@ public final class SentryServiceClientFactory {
   }
 
   public static SentryPolicyServiceClient create(Configuration conf) throws Exception {
-    boolean pooled = conf.getBoolean(ClientConfig.SENTRY_POOL_ENABLED, false);
-    if (pooled) {
       return (SentryPolicyServiceClient) Proxy
           .newProxyInstance(SentryPolicyServiceClientDefaultImpl.class.getClassLoader(),
-              SentryPolicyServiceClientDefaultImpl.class.getInterfaces(),
-              new PoolClientInvocationHandler(conf));
-    } else {
-      return new SentryPolicyServiceClientDefaultImpl(conf);
-    }
+          SentryPolicyServiceClientDefaultImpl.class.getInterfaces(),
+          new PoolClientInvocationHandler(conf));
   }
-
 }

http://git-wip-us.apache.org/repos/asf/sentry/blob/908072d6/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientPoolFactory.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientPoolFactory.java
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientPoolFactory.java
index afea78a..0164fa6 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientPoolFactory.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientPoolFactory.java
@@ -36,16 +36,21 @@ public class SentryServiceClientPoolFactory extends BasePooledObjectFactory<Sent
 
   private static final Logger LOGGER = LoggerFactory.getLogger(SentryServiceClientPoolFactory.class);
 
-  private Configuration conf;
+  private final String addr;
+  private final int port;
+  private final Configuration conf;
 
-  public SentryServiceClientPoolFactory(Configuration conf) {
+  public SentryServiceClientPoolFactory(String addr, int port,
+                                        Configuration conf) {
+    this.addr = addr;
+    this.port = port;
     this.conf = conf;
   }
 
   @Override
   public SentryPolicyServiceClient create() throws Exception {
     LOGGER.debug("Creating Sentry Service Client...");
-    return new SentryPolicyServiceClientDefaultImpl(conf);
+    return new SentryPolicyServiceClientDefaultImpl(addr, port, conf);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/sentry/blob/908072d6/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceFailureCase.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceFailureCase.java
b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceFailureCase.java
index 51bba31..d1ac447 100644
--- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceFailureCase.java
+++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceFailureCase.java
@@ -58,10 +58,13 @@ public class TestSentryServiceFailureCase extends SentryServiceIntegrationBase
{
   public void testClientServerConnectionFailure()  throws Exception {
     try {
       connectToSentryService();
+      String requestorUserName = ADMIN_USER;
+      client.listRoles(requestorUserName);
       Assert.fail("Failed to receive Exception");
     } catch(Exception e) {
       LOGGER.info("Excepted exception", e);
-      Throwable cause = e.getCause();
+      // peer callback exception is nested inside SentryUserException.
+      Throwable cause = e.getCause().getCause();
       if (cause == null) {
         throw e;
       }

http://git-wip-us.apache.org/repos/asf/sentry/blob/908072d6/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceWithInvalidMsgSize.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceWithInvalidMsgSize.java
b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceWithInvalidMsgSize.java
index 15eab15..a4dd8a6 100644
--- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceWithInvalidMsgSize.java
+++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceWithInvalidMsgSize.java
@@ -104,7 +104,7 @@ public class TestSentryServiceWithInvalidMsgSize extends SentryServiceIntegratio
           client.grantServerPrivilege(ADMIN_USER, ROLE_NAME, "server", false);
         } catch (SentryUserException e) {
           exceptionThrown = true;
-          Assert.assertTrue(e.getMessage().contains("org.apache.thrift.transport.TTransportException"));
+          Assert.assertTrue(e.getCause().getMessage().contains("org.apache.thrift.transport.TTransportException"));
         } finally {
           Assert.assertEquals(true, exceptionThrown);
         }

http://git-wip-us.apache.org/repos/asf/sentry/blob/908072d6/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java
b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java
new file mode 100644
index 0000000..d601b1e
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.service.thrift;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hive.hcatalog.messaging.HCatEventMessage;
+import org.apache.sentry.binding.metastore.messaging.json.SentryJSONMessageFactory;
+import org.apache.sentry.provider.db.service.persistent.SentryStore;
+import org.apache.sentry.provider.db.service.thrift.TSentryAuthorizable;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class TestHMSFollower {
+  SentryJSONMessageFactory messageFactory = new SentryJSONMessageFactory();
+  SentryStore sentryStore = Mockito.mock(SentryStore.class);
+  final static String hiveInstance = "server2";
+
+  @Test
+  public void testCreateDatabase() throws Exception {
+    String dbName = "db1";
+
+    // Create notification events
+    NotificationEvent notificationEvent = new NotificationEvent(1, 0, HCatEventMessage.EventType.CREATE_DATABASE.toString(),
+        messageFactory.buildCreateDatabaseMessage(new Database(dbName, null, "hdfs://db1",
null)).toString());
+    List<NotificationEvent> events = new ArrayList<>();
+    events.add(notificationEvent);
+
+    Configuration configuration = new Configuration();
+    HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, hiveInstance);
+    hmsFollower.processNotificationEvents(events);
+
+    TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance);
+    authorizable.setServer(hiveInstance);
+    authorizable.setDb("db1");
+
+    verify(sentryStore, times(1)).dropPrivilege(authorizable);
+  }
+  @Test
+  public void testDropDatabase() throws Exception {
+    String dbName = "db1";
+
+    // Create notification events
+    NotificationEvent notificationEvent = new NotificationEvent(1, 0, HCatEventMessage.EventType.DROP_DATABASE.toString(),
+        messageFactory.buildDropDatabaseMessage(new Database(dbName, null, "hdfs://db1",
null)).toString());
+    List<NotificationEvent> events = new ArrayList<>();
+    events.add(notificationEvent);
+
+    Configuration configuration = new Configuration();
+    HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, hiveInstance);
+    hmsFollower.processNotificationEvents(events);
+
+    TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance);
+    authorizable.setServer(hiveInstance);
+    authorizable.setDb("db1");
+
+    verify(sentryStore, times(1)).dropPrivilege(authorizable);
+  }
+  @Test
+  public void testCreateTable() throws Exception {
+    String dbName = "db1";
+    String tableName = "table1";
+
+    // Create notification events
+    StorageDescriptor sd = new StorageDescriptor();
+    sd.setLocation("hdfs://db1.db/table1");
+    NotificationEvent notificationEvent = new NotificationEvent(1, 0, HCatEventMessage.EventType.CREATE_TABLE.toString(),
+        messageFactory.buildCreateTableMessage(new Table(tableName, dbName, null, 0, 0, 0,
sd, null, null, null, null, null)).toString());
+    List<NotificationEvent> events = new ArrayList<>();
+    events.add(notificationEvent);
+
+    Configuration configuration = new Configuration();
+    HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, hiveInstance);
+    hmsFollower.processNotificationEvents(events);
+
+    TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance);
+    authorizable.setServer(hiveInstance);
+    authorizable.setDb("db1");
+    authorizable.setTable(tableName);
+
+    verify(sentryStore, times(1)).dropPrivilege(authorizable);
+  }
+  @Test
+  public void testDropTable() throws Exception {
+    String dbName = "db1";
+    String tableName = "table1";
+
+    // Create notification events
+    StorageDescriptor sd = new StorageDescriptor();
+    sd.setLocation("hdfs://db1.db/table1");
+    NotificationEvent notificationEvent = new NotificationEvent(1, 0, HCatEventMessage.EventType.DROP_TABLE.toString(),
+        messageFactory.buildDropTableMessage(new Table(tableName, dbName, null, 0, 0, 0,
sd, null, null, null, null, null)).toString());
+    List<NotificationEvent> events = new ArrayList<>();
+    events.add(notificationEvent);
+
+    Configuration configuration = new Configuration();
+    HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, hiveInstance);
+    hmsFollower.processNotificationEvents(events);
+
+    TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance);
+    authorizable.setServer(hiveInstance);
+    authorizable.setDb("db1");
+    authorizable.setTable(tableName);
+
+    verify(sentryStore, times(1)).dropPrivilege(authorizable);
+  }
+  @Test
+  public void testRenameTable() throws Exception {
+    String dbName = "db1";
+    String tableName = "table1";
+
+    String newDbName = "db1";
+    String newTableName = "table2";
+
+    // Create notification events
+    StorageDescriptor sd = new StorageDescriptor();
+    sd.setLocation("hdfs://db1.db/table1");
+    NotificationEvent notificationEvent = new NotificationEvent(1, 0, HCatEventMessage.EventType.ALTER_TABLE.toString(),
+        messageFactory.buildAlterTableMessage(
+            new Table(tableName, dbName, null, 0, 0, 0, sd, null, null, null, null, null),
+            new Table(newTableName, newDbName, null, 0, 0, 0, sd, null, null, null, null,
null)).toString());
+    notificationEvent.setDbName(newDbName);
+    notificationEvent.setTableName(newTableName);
+    List<NotificationEvent> events = new ArrayList<>();
+    events.add(notificationEvent);
+
+    Configuration configuration = new Configuration();
+    HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, hiveInstance);
+    hmsFollower.processNotificationEvents(events);
+
+    TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance);
+    authorizable.setServer(hiveInstance);
+    authorizable.setDb(dbName);
+    authorizable.setTable(tableName);
+
+    TSentryAuthorizable newAuthorizable = new TSentryAuthorizable(hiveInstance);
+    authorizable.setServer(hiveInstance);
+    newAuthorizable.setDb(newDbName);
+    newAuthorizable.setTable(newTableName);
+
+    verify(sentryStore, times(1)).renamePrivilege(authorizable, newAuthorizable);
+  }
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/908072d6/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestPoolClientInvocationHandler.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestPoolClientInvocationHandler.java
b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestPoolClientInvocationHandler.java
new file mode 100644
index 0000000..5b0e12b
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestPoolClientInvocationHandler.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.service.thrift;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestPoolClientInvocationHandler {
+  private static final Logger LOGGER =
+    LoggerFactory.getLogger(TestPoolClientInvocationHandler.class);
+
+  private void expectParseHostPortStrings(String hostsAndPortsStr,
+        String[] expectedHosts, int[] expectedPorts) throws Exception {
+    boolean success = false;
+    String[] hostsAndPorts = hostsAndPortsStr.split(",");
+    String[] hosts = new String[hostsAndPorts.length];
+    int[] ports = new int[hostsAndPorts.length];
+    try {
+      PoolClientInvocationHandler.parseHostPortStrings(hostsAndPortsStr,
+          hostsAndPorts, hosts, ports, 8038);
+      success = true;
+    } finally {
+      if (!success) {
+        LOGGER.error("Caught exception while parsing hosts/ports string " +
+            hostsAndPortsStr);
+      }
+    }
+    Assert.assertArrayEquals("Got unexpected hosts results while " +
+        "parsing " + hostsAndPortsStr, expectedHosts, hosts);
+    Assert.assertArrayEquals("Got unexpected ports results while " +
+        "parsing " + hostsAndPortsStr, expectedPorts, ports);
+  }
+
+  @SuppressWarnings("PMD.AvoidUsingHardCodedIP")
+  @Test
+  public void testParseHostPortStrings() throws Exception {
+    expectParseHostPortStrings("foo", new String[] {"foo"}, new int[] {8038});
+    expectParseHostPortStrings("foo,bar",
+        new String[] {"foo", "bar"},
+        new int[] {8038, 8038});
+    expectParseHostPortStrings("foo:2020,bar:2021",
+        new String[] {"foo", "bar"},
+        new int[] {2020, 2021});
+    expectParseHostPortStrings("127.0.0.1:2020,127.1.0.1",
+        new String[] {"127.0.0.1", "127.1.0.1"},
+        new int[] {2020, 8038});
+    expectParseHostPortStrings("[2001:db8:85a3:8d3:1319:8a2e:370:7348]:433",
+        new String[] {"2001:db8:85a3:8d3:1319:8a2e:370:7348"},
+        new int[] {433});
+  }
+}


Mime
View raw message