sentry-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ha...@apache.org
Subject sentry git commit: SENTRY-1508: MetastorePlugin.java does not handle properly initialization failure (Vadim Spector, Reviewed by: Sravya Tirukkovalur, Alexander Kolbasov and Hao Hao)
Date Tue, 13 Dec 2016 00:06:30 GMT
Repository: sentry
Updated Branches:
  refs/heads/master ff623a944 -> b479df4ba


SENTRY-1508: MetastorePlugin.java does not handle properly initialization failure (Vadim Spector,
Reviewed by: Sravya Tirukkovalur, Alexander Kolbasov and Hao Hao)

Change-Id: I95c00a92257553da56ee1cae4ae5c8f8d04a2409


Project: http://git-wip-us.apache.org/repos/asf/sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/b479df4b
Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/b479df4b
Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/b479df4b

Branch: refs/heads/master
Commit: b479df4ba383a8920661b1b20d086e40a8ac2e1c
Parents: ff623a9
Author: hahao <hao.hao@cloudera.com>
Authored: Mon Dec 12 16:05:11 2016 -0800
Committer: hahao <hao.hao@cloudera.com>
Committed: Mon Dec 12 16:05:11 2016 -0800

----------------------------------------------------------------------
 .../apache/sentry/hdfs/ServiceConstants.java    |   2 -
 .../org/apache/sentry/hdfs/MetastorePlugin.java | 634 +++++++++++++------
 .../sentry/hdfs/MetastorePluginWithHA.java      |   2 +-
 3 files changed, 442 insertions(+), 196 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sentry/blob/b479df4b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java
b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java
index 23552c2..cf94785 100644
--- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java
+++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java
@@ -59,8 +59,6 @@ public class ServiceConstants {
     public static final int SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_WAIT_DURAION_IN_MILLIS_DEFAULT
= 1000;
     public static final String SENTRY_HDFS_SYNC_METASTORE_CACHE_FAIL_ON_PARTIAL_UPDATE =
"sentry.hdfs.sync.metastore.cache.fail.on.partial.update";
     public static final boolean SENTRY_HDFS_SYNC_METASTORE_CACHE_FAIL_ON_PARTIAL_UPDATE_DEFAULT
= true;
-    public static final String SENTRY_HDFS_SYNC_METASTORE_CACHE_ASYNC_INIT_ENABLE = "sentry.hdfs.sync.metastore.cache.async-init.enable";
-    public static final boolean SENTRY_HDFS_SYNC_METASTORE_CACHE_ASYNC_INIT_ENABLE_DEFAULT
= false;
 
     public static final String SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC = "sentry.hdfs.sync.metastore.cache.max-partitions-per-rpc";
     public static final int SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC_DEFAULT = 100;

http://git-wip-us.apache.org/repos/asf/sentry/blob/b479df4b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java
b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java
index 085971b..f6661fd 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java
@@ -17,17 +17,11 @@
  */
 package org.apache.sentry.hdfs;
 
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -42,87 +36,211 @@ import org.apache.sentry.provider.db.SentryMetastoreListenerPlugin;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
 /**
- * Plugin implementation of {@link SentryMetastoreListenerPlugin} that hooks
- * into the sites in the {@link MetaStorePreEventListener} that deal with
- * creation/updation and deletion for paths.
+ * Plugin for the components that need to send path creation, update, and deletion
+ * notifications to the Sentry daemon.
+ *
+ * <p>
+ * Implements {@link SentryMetastoreListenerPlugin} that hooks
+ * into the sites in the {@link MetaStorePreEventListener}.
+ *
+ * <p>
+ * Implementation Notes:
+ *
+ * <ol>
+ * <li>MetastorePlugin performs the following functions:
+ *
+ * <ul>
+ *  <li> At the construction time:
+ *   <ul>
+ *    <li> Initializes local HMS cache with HMS paths information.
+ *    <li> Sends initial HMS paths information to the Sentry daemon.
+ *   </ul>
+ *  </li>
+ *  <li> Upon receiving path update notification from the hosting client code, via
addPath(),
+ *       removePath(), removeAllPaths(), and renameAuthzObject() callback methods:
+ *   <ul>
+ *    <li> Updates local HMS cache accordingly.
+ *    <li> Sends partial update with the assigned sequence number to the Sentry daemon.
+ *    <li> Maintains the latest Sentry partial update sequence number, incrementing
it by 1 on each update.
+ *   </ul>
+ *  </li>
+ *  <li> Periodically, from the housekeeping thread:
+ *   <ul>
+ *    <li> Contacts the Sentry daemon to ask for the sequence number of the latest
received update.
+ *    <li> If the sequence number returned by the Sentry daemon does not match the
sequence number of the
+ *         latest update sent from MetastorePlugin, send the full HMS paths image to the
Sentry daemon.
+ *   </ul>
+ *  </li>
+ * </ul>
+ *
+ * <p>
+ * <li>MetastorePlugin must be a singleton.<br>
+ * Only a single instance of MetastorePlugin can be used. MetastorePlugin has HMS cache
+ * that is updated via calling addPath(), removePath(), removeAllPaths(), renameAuthzObject().
+ * This cache must represent full HMS state at any point, so that full updates, when they
are
+ * needed, would be correct. Channelling different update requests through different MetastorePlugin
+ * instances would make those caches partial and mutually inconsistent.
+ *
+ * <p>
+ * <li>MetastorePlugin is always created, even though ininitialization may fail.<br>
+ * MetastorePlugin initialization (object construction) may fail for two reasons:
+ * <ul>
+ *  <li> HMS cache cannot be initialized, usually due to some invalid HMS path entries.
+ *  <li> Initial cache cannot be sent to Sentry, e.g. due to the communication problems.
+ * </ul>
+ *
+ * <p>
+ * In either case, MetastorePlugin is still constructed, in consideration with the design
of
+ * the existing client code. However, such an instance is marked as invalid; all update APIs
+ * throw IllegalStateException with the appropriate error message and root cause exception.
+ * <br>TODO: failing to construct MetastorePlugin on initialization failure would be
much cleaner,
+ *       but it has to be done in coordination with the HMS client code.
+ *
+ * <p>
+ * <li>MetastorePlugin guarantees delivery of HMS paths updates to Sentry daemon in
the right order.<br>
+ * Each invocation of addPath(), removePath(), removeAllPaths(), renameAuthzObject()
+ * triggers two actions:
+ * <ul>
+ *  <li> increment update sequence number and update the local cache and
+ *  <li> send partial update to the Sentry daemon.
+ * </ul>
+ *
+ * <p>
+ * Update sequence number is created at first step, and then it travels as part of the update
information,
+ * to the Sentry daemon on the second step. Therefore, the sequence of both steps must be
+ * atomic, to guarantee that updates arrive to the Sentry daemon in the right order,
+ * with sequential update number. This is achieved by using notificationLock. The same lock
is used
+ * inside the SyncTask during full Sentry update, when the local and Sentry-side update sequence
+ * numbers are out of sync.
+ *
+ * <p>
+ * <li>MetastorePlugin validates input paths.<br>
+ * Parsing malformed input paths generates SentryMalformedPathException. Since this is a
checked
+ * exception, it is re-thrown wrapped into (un-checked) IllegalArgumentException, to preserve
+ * public APIs' signatures.
+ *
+ * </ol>
  */
+
 public class MetastorePlugin extends SentryMetastoreListenerPlugin {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(MetastorePlugin.class);
 
-  private static final String initializationFailureMsg = "Cache failed to initialize, cannot
send path updates to Sentry." +
-          " Please review HMS error logs during startup for additional information. If the
initialization failure is due" +
-          " to SentryMalformedPathException, you will need to rectify the malformed path
in HMS db and restart HMS";
+  /* MetastorePlugin initialization may fail for two different reasons:
+   * a) Failure to initialize HMS paths cache.
+   * b) Failure to send the initial HMS paths to the Sentry daemon.
+   * Each of the two messages below conveys the reason.
+   */
+  private static final String CACHE_INIT_FAILURE_MSG =
+    "Cache failed to initialize, cannot send path updates to Sentry." +
+    " Please review HMS error logs during startup for additional information. If the initialization
failure is due" +
+    " to SentryMalformedPathException, you will need to rectify the malformed path in HMS
db and restart HMS";
+  private static final String SENTRY_INIT_UPDATE_FAILURE_MSG =
+    "Metastore Plugin failed to initialize - cannot send initial HMS updates to Sentry";
+
+  private static final String SENTRY_COMM_FAILURE_MSG = "Cannot Communicate with Sentry";
 
+  private final Configuration conf;
+  private final Configuration sentryConf;
+
+  // guard for all local+Sentry notifications
+  private final ReentrantLock notificationLock = new ReentrantLock();
+  // sentryClient may be re-instantiated in case of suspected communication failure
+  // This code ensures that access to sentryClient is protected by notificationLock
+  private SentryHDFSServiceClient sentryClient;
+  // Has to match the value of seqNum
+  // This code ensures that access to lastSentSeqNum is protected by notificationLock
+  protected long lastSentSeqNum;
+
+  // pathUpdateLock guards access to UpdateableAuthzPaths which is not thread-safe
+  private final ReentrantReadWriteLock pathUpdateLock = new ReentrantReadWriteLock();
+  // access to authzPaths must be protected by pathUpdateLock
+  private final UpdateableAuthzPaths authzPaths;
+
+  // Initialized to some value > 1.
+  protected final AtomicLong seqNum = new AtomicLong(5);
+  private final Throwable initError;
+  private final String initErrorMsg;
+  private final ScheduledExecutorService threadPool; //NOPMD
+
+  private static volatile ScheduledExecutorService lastThreadPool = null;
+
+  /*
+   * This task is scheduled to run periodically, to make sure Sentry has all updates
+   * -- only if MetastorePlugin has been successfully initialized.
+   */
   class SyncTask implements Runnable {
     @Override
     public void run() {
-      if (!notificiationLock.tryLock()) {
+      if (!notificationLock.tryLock()) {
         // No need to sync.. as metastore is in the process of pushing an update..
         return;
       }
-      if (MetastorePlugin.this.authzPaths == null) {
-        LOGGER.warn(initializationFailureMsg);
-        return;
-      }
       try {
-        long lastSeenBySentry =
-            MetastorePlugin.this.getClient().getLastSeenHMSPathSeqNum();
+        long lastSeenBySentry = getLastSeenHMSPathSeqNum();
         long lastSent = lastSentSeqNum;
         if (lastSeenBySentry != lastSent) {
           LOGGER.warn("#### Sentry not in sync with HMS [" + lastSeenBySentry + ", "
               + lastSent + "]");
-          PathsUpdate fullImageUpdate =
-              MetastorePlugin.this.authzPaths.createFullImageUpdate(lastSent);
-          notifySentryNoLock(fullImageUpdate);
-          LOGGER.warn("#### Synced Sentry with update [" + lastSent + "]");
+          notifySentryFullUpdate(lastSent);
         }
-      } catch (Exception e) {
-        sentryClient = null;
-        LOGGER.error("Error talking to Sentry HDFS Service !!", e);
+      } catch (Exception ignore) {
+        // all methods inside try {} log errors anyway
       } finally {
-        syncSent = true;
-        notificiationLock.unlock();
+        notificationLock.unlock();
       }
     }
   }
 
-  private final Configuration conf;
-  private SentryHDFSServiceClient sentryClient;
-  private volatile UpdateableAuthzPaths authzPaths;
-  private Lock notificiationLock;
-
-  // Initialized to some value > 1.
-  protected static final AtomicLong seqNum = new AtomicLong(5);
-
-  // Has to match the value of seqNum
-  protected static volatile long lastSentSeqNum = seqNum.get();
-  private volatile boolean syncSent = false;
-  private volatile boolean initComplete = false;
-  private volatile boolean queueFlushComplete = false;
-  private volatile Throwable initError = null;
-  private final Queue<PathsUpdate> updateQueue = new LinkedList<PathsUpdate>();
-
-  private final ExecutorService threadPool; //NOPMD
-  private final Configuration sentryConf;
-
+  /*
+   * Proxy class for RPC calls to the Sentry daemon
+   */
   static class ProxyHMSHandler extends HMSHandler {
     public ProxyHMSHandler(String name, HiveConf conf) throws MetaException {
       super(name, conf);
     }
   }
 
-  public MetastorePlugin(Configuration conf, Configuration sentryConf) {
-    this.notificiationLock = new ReentrantLock();
+  /*
+   * Test-only logic. Testing framework may create multiple MetastorePlugin
+   * instances in sequence, without explicitly shutting down the previous
+   * instance, which does not even have any shutdown API (obvious oversight).
+   * This results in multiple housekeeping thread pools, completely messing
+   * up HMS state on Sentry daemon.
+   * Previous thread pool must be shut down.
+   * In real deployments this code does nothing, because there is only one
+   * instance of MetastorePlugin.
+   */
+  private static synchronized void shutdownPreviousHousekeepingThreadPool() {
+    if (lastThreadPool != null) {
+      LOGGER.info("#### Metastore Plugin: shutting down previous housekeeping thread");
+      try {
+        lastThreadPool.shutdownNow();
+      } catch (Throwable t) {
+        LOGGER.error("#### Metastore Plugin: failure shutting down previous housekeeping
thread", t);
+      }
+      lastThreadPool = null;
+    }
+  }
 
+  public MetastorePlugin(Configuration conf, Configuration sentryConf) {
+    Preconditions.checkNotNull(conf, "NULL Hive Configuration");
+    Preconditions.checkNotNull(sentryConf, "NULL Sentry Configuration");
     if (!(conf instanceof HiveConf)) {
-        String error = "Configuration is not an instanceof HiveConf";
+        String error = "Hive Configuration is not an instanceof HiveConf: " + conf.getClass().getName();
         LOGGER.error(error);
-        throw new RuntimeException(error);
+        throw new IllegalArgumentException(error);
     }
+
+    /*
+     * Test-only logic. See javadoc for this method.
+     */
+    shutdownPreviousHousekeepingThreadPool();
+
     this.conf = new HiveConf((HiveConf)conf);
 
     this.sentryConf = new Configuration(sentryConf);
@@ -130,113 +248,156 @@ public class MetastorePlugin extends SentryMetastoreListenerPlugin
{
     this.conf.unset(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS.varname);
     this.conf.unset(HiveConf.ConfVars.METASTORE_END_FUNCTION_LISTENERS.varname);
     this.conf.unset(HiveConf.ConfVars.METASTOREURIS.varname);
-    Thread initUpdater = new Thread() {
-      @Override
-      public void run() {
-        MetastoreCacheInitializer cacheInitializer = null;
-        try {
-          cacheInitializer =
-                  new MetastoreCacheInitializer(new ProxyHMSHandler("sentry.hdfs",
-                        (HiveConf) MetastorePlugin.this.conf),
-                          MetastorePlugin.this.conf);
-          MetastorePlugin.this.authzPaths =
-                  cacheInitializer.createInitialUpdate();
-          LOGGER.info("#### Metastore Plugin initialization complete !!");
-          synchronized (updateQueue) {
-            while (!updateQueue.isEmpty()) {
-              PathsUpdate update = updateQueue.poll();
-              if (update != null) {
-                processUpdate(update);
-              }
-            }
-            queueFlushComplete = true;
-          }
-          LOGGER.info("#### Finished flushing queued updates to Sentry !!");
-        } catch (Exception e) {
-          LOGGER.error("#### Could not create Initial AuthzPaths or HMSHandler !!", e);
-          initError = e;
-        } finally {
-          if (cacheInitializer != null) {
-            try {
-              cacheInitializer.close();
-            } catch (Exception e) {
-              LOGGER.info("#### Exception while closing cacheInitializer !!", e);
-            }
-          }
-          initComplete = true;
-        }
+
+    Throwable tmpInitError = null;
+    String tmpInitErrorMsg = null;
+
+    /* Initialization Step #1: initialize local HMS state cache.
+     * To preserve the contract with the existing Hive client code,
+     * MetastorePlugin shall be constructed even if initialization fails,
+     * though it will be completely unoperable.
+     */
+    UpdateableAuthzPaths tmpAuthzPaths;
+    try (MetastoreCacheInitializer cacheInitializer = new MetastoreCacheInitializer(
+        new ProxyHMSHandler("sentry.hdfs", (HiveConf) this.conf),
+        this.conf))
+    {
+      // initialize HMS cache.
+      tmpAuthzPaths = cacheInitializer.createInitialUpdate();
+      LOGGER.info("#### Metastore Plugin HMS cache initialization complete");
+    } catch (Throwable e) {
+      tmpInitError = e;
+      tmpInitErrorMsg = CACHE_INIT_FAILURE_MSG;
+      tmpAuthzPaths = null;
+      LOGGER.error("#### " + tmpInitErrorMsg, e);
+      for (Throwable thr : e.getSuppressed()) {
+        LOGGER.warn("#### Exception while closing cacheInitializer", thr);
       }
-    };
-    if (this.conf.getBoolean(
-            ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_ASYNC_INIT_ENABLE,
-            ServerConfig
-                    .SENTRY_HDFS_SYNC_METASTORE_CACHE_ASYNC_INIT_ENABLE_DEFAULT)) {
-      LOGGER.warn("#### Metastore Cache initialization is set to aync..." +
-              "HDFS ACL synchronization will not happen until metastore" +
-              "cache initialization is completed !!");
-      initUpdater.start();
-    } else {
-      initUpdater.run(); //NOPMD
     }
+    this.authzPaths = tmpAuthzPaths;
+
+    /* If HMS cache initialization failed, further initialization shall be skipped.
+     * MetastorePlugin is considered non-operational, and all of its public APIs
+     * shall be throwing an exception.
+     */
+    if (tmpInitError != null) {
+      this.threadPool = null;
+      this.initError = tmpInitError;
+      this.initErrorMsg = tmpInitErrorMsg;
+      return;
+    }
+
+    /* Initialization Step #2: push initial HMS state to Sentry.
+     * Synchronization by notificationLock is for visibility of changes to sentryClient.
+     */
+    notificationLock.lock();
     try {
-      sentryClient = SentryHDFSServiceClientFactory.create(sentryConf);
-    } catch (Exception e) {
-      sentryClient = null;
-      LOGGER.error("Could not connect to Sentry HDFS Service !!", e);
+      this.lastSentSeqNum = seqNum.get();
+      notifySentryFullUpdate(lastSentSeqNum);
+      LOGGER.info("#### Metastore Plugin Sentry full initial update complete");
+    } catch (Throwable e) {
+      tmpInitError = e;
+      tmpInitErrorMsg = SENTRY_INIT_UPDATE_FAILURE_MSG;
+      LOGGER.error("#### " + tmpInitErrorMsg, e);
+    } finally {
+      notificationLock.unlock();
+    }
+
+    this.initError = tmpInitError;
+    this.initErrorMsg = tmpInitErrorMsg;
+
+    /* If sending HMS state to Sentry failed, further initialization shall be skipped.
+     * MetastorePlugin is considered non-operational, and all of its public APIs
+     * shall be throwing an exception.
+     */
+    if (this.initError != null) {
+      this.threadPool = null;
+      return;
     }
-    ScheduledExecutorService newThreadPool = Executors.newScheduledThreadPool(1);
-    newThreadPool.scheduleWithFixedDelay(new SyncTask(),
-            this.conf.getLong(ServerConfig
-                            .SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_MS,
-                    ServerConfig.SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_DEFAULT),
-            this.conf.getLong(ServerConfig.SENTRY_HDFS_SYNC_CHECKER_PERIOD_MS,
-                    ServerConfig.SENTRY_HDFS_SYNC_CHECKER_PERIOD_DEFAULT),
-            TimeUnit.MILLISECONDS);
-    this.threadPool = newThreadPool;
+
+    /* Initialization Step #3: schedulle SyncTask to run periodically, to make
+     * sure Sentry has the current HMS state.
+     */
+    this.threadPool = Executors.newScheduledThreadPool(1);
+    this.threadPool.scheduleWithFixedDelay(new SyncTask(),
+      this.conf.getLong(ServerConfig.SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_MS,
+                        ServerConfig.SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_DEFAULT),
+      this.conf.getLong(ServerConfig.SENTRY_HDFS_SYNC_CHECKER_PERIOD_MS,
+                        ServerConfig.SENTRY_HDFS_SYNC_CHECKER_PERIOD_DEFAULT),
+      TimeUnit.MILLISECONDS);
+    MetastorePlugin.lastThreadPool = this.threadPool;
+    LOGGER.info("#### Metastore Plugin Sentry initialization complete");
   }
 
   @Override
   public void addPath(String authzObj, String path) {
+    assertInit();
+
+    // validate / parse inputs
     List<String> pathTree = null;
     try {
       pathTree = PathsUpdate.parsePath(path);
     } catch (SentryMalformedPathException e) {
-      LOGGER.error("Unexpected path in addPath: authzObj = " + authzObj + " , path = " +
path);
-      e.printStackTrace();
-      return;
+      String err = "Unexpected path in addPath: authzObj = " + authzObj + " , path = " +
path;
+      LOGGER.error(err, e);
+      throw new IllegalArgumentException(err, e);
     }
     if(pathTree == null) {
+      LOGGER.debug("#### HMS Path Update ["
+        + "OP : addPath, "
+        + "authzObj : " + authzObj.toLowerCase() + ", "
+        + "path : " + path + "] - nothing to add");
       return;
     }
     LOGGER.debug("#### HMS Path Update ["
         + "OP : addPath, "
         + "authzObj : " + authzObj.toLowerCase() + ", "
         + "path : " + path + "]");
-    PathsUpdate update = createHMSUpdate();
-    update.newPathChange(authzObj.toLowerCase()).addToAddPaths(pathTree);
-    notifySentryAndApplyLocal(update);
+
+    // do local and remote updates
+    notificationLock.lock();
+    try {
+      PathsUpdate update = createHMSUpdate();
+      update.newPathChange(authzObj.toLowerCase()).addToAddPaths(pathTree);
+      updateLocalCacheAndNotifySentry(update);
+    } finally {
+      notificationLock.unlock();
+    }
   }
 
   @Override
   public void removeAllPaths(String authzObj, List<String> childObjects) {
+    assertInit();
+
+    // validate / parse inputs
     LOGGER.debug("#### HMS Path Update ["
         + "OP : removeAllPaths, "
         + "authzObj : " + authzObj.toLowerCase() + ", "
         + "childObjs : " + (childObjects == null ? "[]" : childObjects) + "]");
-    PathsUpdate update = createHMSUpdate();
-    if (childObjects != null) {
-      for (String childObj : childObjects) {
-        update.newPathChange(authzObj.toLowerCase() + "." + childObj).addToDelPaths(
+
+    // do local and remote updates
+    notificationLock.lock();
+    try {
+      PathsUpdate update = createHMSUpdate();
+      if (childObjects != null) {
+        for (String childObj : childObjects) {
+          update.newPathChange(authzObj.toLowerCase() + "." + childObj).addToDelPaths(
             Lists.newArrayList(PathsUpdate.ALL_PATHS));
+        }
       }
-    }
-    update.newPathChange(authzObj.toLowerCase()).addToDelPaths(
+      update.newPathChange(authzObj.toLowerCase()).addToDelPaths(
             Lists.newArrayList(PathsUpdate.ALL_PATHS));
-    notifySentryAndApplyLocal(update);
+      updateLocalCacheAndNotifySentry(update);
+    } finally {
+      notificationLock.unlock();
+    }
   }
 
   @Override
   public void removePath(String authzObj, String path) {
+    assertInit();
+
+    // validate / parse inputs
     if ("*".equals(path)) {
       removeAllPaths(authzObj.toLowerCase(), null);
     } else {
@@ -244,154 +405,241 @@ public class MetastorePlugin extends SentryMetastoreListenerPlugin
{
       try {
         pathTree = PathsUpdate.parsePath(path);
       } catch (SentryMalformedPathException e) {
-        LOGGER.error("Unexpected path in removePath: authzObj = " + authzObj + " , path =
" + path);
-        e.printStackTrace();
-        return;
+        String err = "Unexpected path in removePath: authzObj = " + authzObj + " , path =
" + path;
+        LOGGER.error(err, e);
+        throw new IllegalArgumentException(err, e);
       }
       if(pathTree == null) {
+        LOGGER.debug("#### HMS Path Update ["
+          + "OP : removePath, "
+          + "authzObj : " + authzObj.toLowerCase() + ", "
+          + "path : " + path + "] - nothing to remove");
         return;
       }
       LOGGER.debug("#### HMS Path Update ["
           + "OP : removePath, "
           + "authzObj : " + authzObj.toLowerCase() + ", "
           + "path : " + path + "]");
-      PathsUpdate update = createHMSUpdate();
-      update.newPathChange(authzObj.toLowerCase()).addToDelPaths(pathTree);
-      notifySentryAndApplyLocal(update);
+
+      // do local and remote updates
+      notificationLock.lock();
+      try {
+        PathsUpdate update = createHMSUpdate();
+        update.newPathChange(authzObj.toLowerCase()).addToDelPaths(pathTree);
+        updateLocalCacheAndNotifySentry(update);
+      } finally {
+        notificationLock.unlock();
+      }
     }
   }
 
   @Override
   public void renameAuthzObject(String oldName, String oldPath, String newName,
       String newPath) {
+    assertInit();
+
+    // validate / parse inputs
     String oldNameLC = oldName != null ? oldName.toLowerCase() : null;
     String newNameLC = newName != null ? newName.toLowerCase() : null;
-    PathsUpdate update = createHMSUpdate();
     LOGGER.debug("#### HMS Path Update ["
         + "OP : renameAuthzObject, "
-        + "oldName : " + oldNameLC + ","
-        + "oldPath : " + oldPath + ","
-        + "newName : " + newNameLC + ","
-        + "newPath : " + newPath + "]");
+        + "oldName : " + oldNameLC + ", "
+        + "oldPath : " + oldPath   + ", "
+        + "newName : " + newNameLC + ", "
+        + "newPath : " + newPath   + "]");
     List<String> newPathTree = null;
     try {
       newPathTree = PathsUpdate.parsePath(newPath);
     } catch (SentryMalformedPathException e) {
-      LOGGER.error("Unexpected path in renameAuthzObject while parsing newPath: oldName="
+ oldName + ", oldPath=" + oldPath +
-      ", newName=" + newName + ", newPath=" + newPath);
-      e.printStackTrace();
-      return;
-    }
-
-    if( newPathTree != null ) {
-      update.newPathChange(newNameLC).addToAddPaths(newPathTree);
+      String err = "Unexpected path in renameAuthzObject while parsing newPath: oldName="
+ oldName + ", oldPath=" + oldPath +
+        ", newName=" + newName + ", newPath=" + newPath;
+      LOGGER.error(err, e);
+      throw new IllegalArgumentException(err, e);
     }
     List<String> oldPathTree = null;
     try {
       oldPathTree = PathsUpdate.parsePath(oldPath);
     } catch (SentryMalformedPathException e) {
-      LOGGER.error("Unexpected path in renameAuthzObject while parsing oldPath: oldName="
+ oldName + ", oldPath=" + oldPath +
-              ", newName=" + newName + ", newPath=" + newPath);
-      e.printStackTrace();
-      return;
+      String err = "Unexpected path in renameAuthzObject while parsing oldPath: oldName="
+ oldName + ", oldPath=" + oldPath +
+        ", newName=" + newName + ", newPath=" + newPath;
+      LOGGER.error(err, e);
+      throw new IllegalArgumentException(err, e);
     }
 
-    if( oldPathTree != null ) {
-      update.newPathChange(oldNameLC).addToDelPaths(oldPathTree);
+    // do local and remote updates
+    notificationLock.lock();
+    try {
+      PathsUpdate update = createHMSUpdate();
+      if( newPathTree != null ) {
+        update.newPathChange(newNameLC).addToAddPaths(newPathTree);
+      }
+      if( oldPathTree != null ) {
+        update.newPathChange(oldNameLC).addToDelPaths(oldPathTree);
+      }
+      updateLocalCacheAndNotifySentry(update);
+    } finally {
+      notificationLock.unlock();
     }
-    notifySentryAndApplyLocal(update);
   }
 
-  private SentryHDFSServiceClient getClient() {
+  /*
+   * Instantiate client (unless it's already instantiated) to talk to Sentry service.
+   * Call must be protected by notificationLock.
+   */
+  private SentryHDFSServiceClient getClient() throws Exception {
+    assert notificationLock.isHeldByCurrentThread() : "Internal Faulure: access to Sentry
client is nt protected by notificationLock";
     if (sentryClient == null) {
       try {
         sentryClient = SentryHDFSServiceClientFactory.create(sentryConf);
       } catch (Exception e) {
         sentryClient = null;
-        LOGGER.error("#### Could not connect to Sentry HDFS Service !!", e);
+        final String err = SENTRY_COMM_FAILURE_MSG;
+        LOGGER.error(err, e);
+        throw new Exception(err, e);
       }
     }
     return sentryClient;
   }
 
+  /*
+   * Initialize HMS update object and assign its sequence number.
+   * Call must be protected by notificationLock.
+   */
   private PathsUpdate createHMSUpdate() {
     PathsUpdate update = new PathsUpdate(seqNum.incrementAndGet(), false);
     LOGGER.debug("#### Creating HMS Path Update SeqNum : [" + seqNum.get() + "]");
     return update;
   }
 
-  protected void notifySentryNoLock(PathsUpdate update) {
+  /*
+   * Get the last seen HMS path update sequence number from Sentry service.
+   * Call must be protected by notificationLock.
+   */
+  private long getLastSeenHMSPathSeqNum() throws Exception {
+    try {
+      return getClient().getLastSeenHMSPathSeqNum();
+    } catch (Exception e) {
+      final String err = "Could not fetch the last seen HMS Path Sequence number from Sentry
HDFS Service";
+      LOGGER.error(err, e);
+      resetClient();
+      throw e;
+    }
+  }
+
+  /*
+   * Send update to Sentry service.
+   * This method, when called from notifySentry(), is followed by updating lastSentSeqNumber.
+   * When called directly, to send full updates (i.e. during initialization and from SyncTask),
+   * the update sequence number does not change.
+   * Call must be protected by notificationLock.
+   */
+  private void notifySentry_NoSeqNumIncr(PathsUpdate update) {
     final Timer.Context timerContext =
         SentryHdfsMetricsUtil.getNotifyHMSUpdateTimer.time();
     try {
       getClient().notifyHMSUpdate(update);
     } catch (Exception e) {
-      LOGGER.error("Could not send update to Sentry HDFS Service !!", e);
+      final String err = "Could not send update to Sentry HDFS Service";
+      LOGGER.error(err, e);
+      resetClient();
       SentryHdfsMetricsUtil.getFailedNotifyHMSUpdateCounter.inc();
+      throw new RuntimeException(err, e);
     } finally {
       timerContext.stop();
     }
   }
 
+  /**
+   * Send update to Sentry service and update last sent sequence number.
+   * Called only if MetastorePlugin has been successfully initialized.
+   * Call must be protected by notificationLock.
+   */
   protected void notifySentry(PathsUpdate update) {
-    notificiationLock.lock();
     try {
-      if (!syncSent) {
-        new SyncTask().run();
-      }
-
-      notifySentryNoLock(update);
+      notifySentry_NoSeqNumIncr(update);
     } finally {
       lastSentSeqNum = update.getSeqNum();
-      notificiationLock.unlock();
       LOGGER.debug("#### HMS Path Last update sent : ["+ lastSentSeqNum + "]");
     }
   }
 
+  /*
+   * Send full update to Sentry service.
+   * Called only if MetastorePlugin has been successfully initialized.
+   * Call must be protected by notificationLock.
+   */
+  private void notifySentryFullUpdate(long lastSent) {
+    PathsUpdate fullImageUpdate = null;
+    // access to authzPaths should be consistently protected by pathUpdateLock
+    pathUpdateLock.readLock().lock();
+    try {
+      fullImageUpdate = authzPaths.createFullImageUpdate(lastSent);
+    } finally {
+      pathUpdateLock.readLock().unlock();
+    }
+    notifySentry_NoSeqNumIncr(fullImageUpdate);
+    LOGGER.warn("#### Synced Sentry with update [" + lastSent + "]");
+  }
+
+  /*
+   * When suspecting sentryClient comm error - reset the client
+   * Call must be protected by notificationLock.
+   */
+  private void resetClient() {
+    if (sentryClient != null) {
+      try {
+        sentryClient.close();
+      } catch (Exception ignore) {
+      }
+      sentryClient = null;
+    }
+  }
+
+  /**
+   * Apply paths update to local cache.
+   * Called only if MetastorePlugin has been successfully initialized.
+   * Call must be protected by notificationLock.
+   */
   protected void applyLocal(PathsUpdate update) {
     final Timer.Context timerContext =
         SentryHdfsMetricsUtil.getApplyLocalUpdateTimer.time();
-    if(authzPaths == null) {
-      LOGGER.error(initializationFailureMsg);
-      return;
+    try {
+      authzPaths.updatePartial(Lists.newArrayList(update), pathUpdateLock);
+    } finally {
+      timerContext.stop();
     }
-    authzPaths.updatePartial(Lists.newArrayList(update), new ReentrantReadWriteLock());
-    timerContext.stop();
     SentryHdfsMetricsUtil.getApplyLocalUpdateHistogram.update(
         update.getPathChanges().size());
   }
 
-  private void notifySentryAndApplyLocal(PathsUpdate update) {
-    if(authzPaths == null) {
-      LOGGER.error(initializationFailureMsg);
-      return;
-    }
-    if (initComplete) {
-      processUpdate(update);
-    } else {
-      if (initError == null) {
-        synchronized (updateQueue) {
-          if (!queueFlushComplete) {
-            updateQueue.add(update);
-          } else {
-            processUpdate(update);
-          }
-        }
-      } else {
-        StringWriter sw = new StringWriter();
-        initError.printStackTrace(new PrintWriter(sw));
-        LOGGER.error("#### Error initializing Metastore Plugin" +
-                "[" + sw.toString() + "] !!");
-        throw new RuntimeException(initError);
-      }
-      LOGGER.warn("#### Path update [" + update.getSeqNum() + "] not sent to Sentry.." +
-              "Metastore hasn't been initialized yet !!");
-    }
+  /*
+   * Apply paths update to local cache.
+   * Send partial update to Sentry service.
+   * Called only if MetastorePlugin has been successfully initialized.
+   * Call must be protected by notificationLock.
+   */
+  private void updateLocalCacheAndNotifySentry(PathsUpdate update) {
+    applyLocal(update);
+    notifySentry(update);
   }
 
+  /**
+   * Apply paths update to local cache and send partial update to Sentry.
+   * Called only if MetastorePlugin has been successfully initialized.
+   * Call must be protected by notificationLock.
+   */
   protected void processUpdate(PathsUpdate update) {
-    applyLocal(update);
-    notifySentry(update);
+    updateLocalCacheAndNotifySentry(update);
+  }
+
+  /*
+   * Check successfull initialization first, in each update callback method.
+   * Null initError guarantees successful initialization.
+   */
+  private void assertInit() {
+    if (initError != null) {
+      throw new IllegalStateException(initErrorMsg, initError);
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/sentry/blob/b479df4b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePluginWithHA.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePluginWithHA.java
b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePluginWithHA.java
index 6476a01..32b635f 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePluginWithHA.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePluginWithHA.java
@@ -75,7 +75,7 @@ public class MetastorePluginWithHA extends MetastorePlugin {
         new SentryMetastoreHACacheListener(this));
     // start seq# from the last global seq
     seqNum.set(pluginCacheSync.getUpdateCounter());
-    MetastorePlugin.lastSentSeqNum = seqNum.get();
+    this.lastSentSeqNum = seqNum.get();
   }
 
   @Override


Mime
View raw message