sentry-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sra...@apache.org
Subject [2/2] sentry git commit: SENTRY-1317: Implement fencing required for active/standby (Colin P. McCabe , Reviewed by: Hao Hao and Sravya Tirukkovalur)
Date Tue, 12 Jul 2016 20:10:52 GMT
SENTRY-1317: Implement fencing required for active/standby (Colin P. McCabe , Reviewed by: Hao Hao and Sravya Tirukkovalur)

New fencing and active/passive code
- Activator to store the state about whether the daemon is active or not, as well as manage fencing
- Create Fencer to implement SQL fencing.
- Add SqlAccessor to talk directly to SQL databases
LeaderStatus: generate shorter incarnation IDs by using base64.
LeaderStatusAdaptor: implement close()
Remove old code which is no longer used
- HAContext
- PluginCacheSyncUtil
- TestHAUpdateForwarder
- ServiceRegister
SentryStore
- move DataNucleus properties setup into a utility function
- Remove unused DEFAULT_DATA_DIR variable (it's not used anywhere in the code)
- SentryStore should maintain a reference to the Activator
Add SentryStandbyException to indicate that the daemon is currently standby
Move SENTRY_ZK_JAAS_NAME from HAContext to SentryConstants
DelegateSentryStore: make some fields final

Change-Id: Ic41711ecbc218bb21e3ca3120998866d65e16493


Project: http://git-wip-us.apache.org/repos/asf/sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/a70cff99
Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/a70cff99
Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/a70cff99

Branch: refs/heads/sentry-ha-redesign
Commit: a70cff999180486183e68fd9229818970f6b8fc1
Parents: de7c26a
Author: Sravya Tirukkovalur <sravya@apache.org>
Authored: Tue Jul 12 13:03:23 2016 -0700
Committer: Sravya Tirukkovalur <sravya@apache.org>
Committed: Tue Jul 12 13:10:38 2016 -0700

----------------------------------------------------------------------
 .../exception/SentryStandbyException.java       |  33 ++
 .../core/common/utils/SentryConstants.java      |   3 +
 .../apache/sentry/hdfs/PluginCacheSyncUtil.java | 251 ---------------
 .../sentry/hdfs/SentryHdfsMetricsUtil.java      |   8 -
 .../sentry/hdfs/TestHAUpdateForwarder.java      |  66 ----
 .../provider/db/service/persistent/Fencer.java  | 242 +++++++++++++++
 .../db/service/persistent/HAContext.java        | 262 ----------------
 .../db/service/persistent/SentryStore.java      |  32 +-
 .../db/service/persistent/ServiceRegister.java  |  52 ----
 .../db/service/persistent/SqlAccessor.java      | 309 +++++++++++++++++++
 .../thrift/SentryPolicyStoreProcessor.java      |  31 +-
 .../apache/sentry/service/thrift/Activator.java | 112 +++++++
 .../sentry/service/thrift/Activators.java       |  69 +++++
 .../sentry/service/thrift/LeaderStatus.java     |  31 +-
 .../service/thrift/LeaderStatusAdaptor.java     |  41 ++-
 .../sentry/service/thrift/SentryService.java    |  33 +-
 .../sentry/service/thrift/ServiceConstants.java |   4 +
 .../persistent/SentryStoreIntegrationBase.java  |  15 +
 .../TestPrivilegeOperatePersistence.java        |  22 +-
 .../db/service/persistent/TestSentryStore.java  |  14 +
 .../persistent/TestSentryStoreImportExport.java |  16 +-
 .../service/persistent/TestSentryVersion.java   |  18 ++
 .../thrift/SentryServiceIntegrationBase.java    |   3 +-
 .../sentry/service/thrift/TestLeaderStatus.java |  26 ++
 24 files changed, 982 insertions(+), 711 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sentry/blob/a70cff99/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/exception/SentryStandbyException.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/exception/SentryStandbyException.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/exception/SentryStandbyException.java
new file mode 100644
index 0000000..73c7e4e
--- /dev/null
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/exception/SentryStandbyException.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.core.common.exception;
+
+/**
+ * An exception which indicates that the current server is standby.
+ */
+public class SentryStandbyException extends SentryUserException {
+  private static final long serialVersionUID = 2162010615815L;
+
+  public SentryStandbyException(String msg) {
+    super(msg);
+  }
+
+  public SentryStandbyException(String msg, String reason) {
+    super(msg, reason);
+  }
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/a70cff99/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/SentryConstants.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/SentryConstants.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/SentryConstants.java
index 3da4906..c094058 100644
--- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/SentryConstants.java
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/SentryConstants.java
@@ -40,4 +40,7 @@ public class SentryConstants {
   public static final String RESOURCE_WILDCARD_VALUE_ALL = "ALL";
   public static final String RESOURCE_WILDCARD_VALUE_SOME = "+";
   public static final String ACCESS_ALLOW_URI_PER_DB_POLICYFILE = "sentry.allow.uri.db.policyfile";
+
+  public static final String SENTRY_ZK_JAAS_NAME = "Sentry";
+  public static final String CURRENT_INCARNATION_ID_KEY = "current.incarnation.key";
 }

http://git-wip-us.apache.org/repos/asf/sentry/blob/a70cff99/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PluginCacheSyncUtil.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PluginCacheSyncUtil.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PluginCacheSyncUtil.java
deleted file mode 100644
index 4ce16c7..0000000
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PluginCacheSyncUtil.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sentry.hdfs;
-
-import java.io.IOException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import com.codahale.metrics.Timer;
-import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
-import org.apache.curator.framework.recipes.cache.PathChildrenCache;
-import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
-import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
-import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
-import org.apache.curator.utils.ZKPaths;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.sentry.hdfs.ServiceConstants.ServerConfig;
-import org.apache.sentry.hdfs.Updateable.Update;
-import org.apache.sentry.provider.db.SentryPolicyStorePlugin.SentryPluginException;
-import org.apache.sentry.provider.db.service.persistent.HAContext;
-import org.apache.zookeeper.KeeperException.NoNodeException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * Utility class for handling the cache update syncup via Curator path cache It
- * creates the path cache, a distributed lock and counter. The updated API
- * updates the counter, creates a znode zpath/counter and writes the data to it.
- * The caller should provider the cache callback handler class that posts the
- * update object to the required cache
- */
-public class PluginCacheSyncUtil {
-  private static final Logger LOGGER = LoggerFactory
-      .getLogger(PluginCacheSyncUtil.class);
-  public static final long CACHE_GC_SIZE_THRESHOLD_HWM = 100;
-  public static final long CACHE_GC_SIZE_THRESHOLD_LWM = 50;
-  public static final long CACHE_GC_SIZE_MAX_CLEANUP = 1000;
-  public static final long ZK_COUNTER_INIT_VALUE = 4;
-  public static final long GC_COUNTER_INIT_VALUE = ZK_COUNTER_INIT_VALUE + 1;
-
-  private final String zkPath;
-  private final HAContext haContext;
-  private final PathChildrenCache cache;
-  private InterProcessSemaphoreMutex updatorLock, gcLock;
-  private int lockTimeout;
-  private DistributedAtomicLong updateCounter, gcCounter;
-  private final ScheduledExecutorService gcSchedulerForZk = Executors
-      .newScheduledThreadPool(1);
-
-  public PluginCacheSyncUtil(String zkPath, final Configuration conf,
-      PathChildrenCacheListener cacheListener) throws SentryPluginException {
-    this.zkPath = zkPath;
-    // Init ZK connection
-    try {
-      haContext = HAContext.getHAContext(conf);
-    } catch (Exception e) {
-      throw new SentryPluginException("Error creating HA context ", e);
-    }
-    haContext.startCuratorFramework();
-
-    // Init path cache
-    cache = new PathChildrenCache(haContext.getCuratorFramework(), zkPath
-        + "/cache", true);
-    // path cache callback
-    cache.getListenable().addListener(cacheListener);
-    try {
-      cache.start();
-    } catch (Exception e) {
-      throw new SentryPluginException("Error creating ZK PathCache ", e);
-    }
-    updatorLock = new InterProcessSemaphoreMutex(
-        haContext.getCuratorFramework(), zkPath + "/lock");
-    lockTimeout = conf.getInt(
-        ServerConfig.SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_MS,
-        ServerConfig.SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_DEFAULT);
-    gcLock = new InterProcessSemaphoreMutex(
-        haContext.getCuratorFramework(), zkPath + "/gclock");
-
-    updateCounter = new DistributedAtomicLong(haContext.getCuratorFramework(),
-        zkPath + "/counter", haContext.getRetryPolicy());
-    try {
-      updateCounter.initialize(ZK_COUNTER_INIT_VALUE);
-    } catch (Exception e) {
-      LOGGER.error("Error initializing  counter for zpath " + zkPath, e);
-    }
-
-    // GC setup
-    gcCounter = new DistributedAtomicLong(haContext.getCuratorFramework(),
-        zkPath + "/gccounter", haContext.getRetryPolicy());
-    try {
-      gcCounter.initialize(GC_COUNTER_INIT_VALUE);
-    } catch (Exception e) {
-      LOGGER.error("Error initializing  counter for zpath " + zkPath, e);
-    }
-    final Runnable gcRunner = new Runnable() {
-      public void run() {
-        gcPluginCache(conf);
-      }
-    };
-    gcSchedulerForZk.scheduleAtFixedRate(gcRunner, 10, 10, TimeUnit.MINUTES);
-  }
-
-  public void handleCacheUpdate(Update update) throws SentryPluginException {
-    final Timer.Context timerContext = SentryHdfsMetricsUtil.getCacheSyncToZKTimer.time();
-    // post message to ZK cache
-    try {
-      // Acquire ZK lock for update cache sync. This ensures that the counter
-      // increment and znode creation is atomic operation
-      if (!updatorLock.acquire(lockTimeout, TimeUnit.MILLISECONDS)) {
-        throw new SentryPluginException(
-            "Failed to get ZK lock for update cache syncup");
-      }
-    } catch (Exception e1) {
-      // Stop timer in advance
-      timerContext.stop();
-      SentryHdfsMetricsUtil.getFailedCacheSyncToZK.inc();
-      throw new SentryPluginException(
-          "Error getting ZK lock for update cache syncup" + e1, e1);
-    }
-    boolean failed = false;
-    try {
-      try {
-        // increment the global sequence counter if this is not a full update
-        if (!update.hasFullImage()) {
-          update.setSeqNum(updateCounter.increment().postValue());
-        } else {
-          if (updateCounter.get().preValue() < update.getSeqNum()) {
-            updateCounter.add(update.getSeqNum() - updateCounter.get().preValue());
-          }
-        }
-      } catch (Exception e1) {
-        failed = true;
-        throw new SentryPluginException(
-            "Error setting ZK counter for update cache syncup" + e1, e1);
-      }
-
-      // Create a new znode with the sequence number and write the update data
-      // into it
-      String updateSeq = String.valueOf(update.getSeqNum());
-      String newPath = ZKPaths.makePath(zkPath + "/cache", updateSeq);
-      try {
-        haContext.getCuratorFramework().create().creatingParentsIfNeeded()
-            .forPath(newPath, update.serialize());
-      } catch (Exception e) {
-        failed = true;
-        throw new SentryPluginException("error posting update to ZK ", e);
-      }
-    } finally {
-      // release the ZK lock
-      try {
-        updatorLock.release();
-      } catch (Exception e) {
-        // Stop timer in advance
-        timerContext.stop();
-        SentryHdfsMetricsUtil.getFailedCacheSyncToZK.inc();
-        throw new SentryPluginException(
-            "Error releasing ZK lock for update cache syncup" + e, e);
-      }
-      timerContext.stop();
-      if (failed) {
-        SentryHdfsMetricsUtil.getFailedCacheSyncToZK.inc();
-      }
-    }
-  }
-
-  public static void setUpdateFromChildEvent(PathChildrenCacheEvent cacheEvent,
-      Update update) throws IOException {
-    byte eventData[] = cacheEvent.getData().getData();
-    update.deserialize(eventData);
-    String seqNum = ZKPaths.getNodeFromPath(cacheEvent.getData().getPath());
-    update.setSeqNum(Integer.valueOf(seqNum));
-  }
-
-  public void close() throws IOException {
-    cache.close();
-  }
-
-  public long getUpdateCounter() throws Exception {
-    return updateCounter.get().preValue();
-  }
-
-  /**
-   * Cleanup old znode of the plugin cache. The last cleaned and last created
-   * node counters are stored in ZK. If the number of available nodes are more
-   * than the high water mark, then we delete the old nodes till we reach low
-   * water mark. The scheduler periodically runs the cleanup routine
-   * @param conf
-   */
-  @VisibleForTesting
-  public void gcPluginCache(Configuration conf) {
-    try {
-      // If we can acquire gc lock, then continue with znode cleanup
-      if (!gcLock.acquire(500, TimeUnit.MILLISECONDS)) {
-        return;
-      }
-
-      // If we have passed the High watermark, then start the cleanup
-      Long updCount = updateCounter.get().preValue();
-      Long gcCount = gcCounter.get().preValue();
-      if (updCount - gcCount > CACHE_GC_SIZE_THRESHOLD_HWM) {
-        Long numNodesToClean = Math.min(updCount - gcCount
-            - CACHE_GC_SIZE_THRESHOLD_LWM, CACHE_GC_SIZE_MAX_CLEANUP);
-        for (Long nodeNum = gcCount; nodeNum < gcCount + numNodesToClean; nodeNum++) {
-          String pathToDelete = ZKPaths.makePath(zkPath + "/cache",
-              Long.toString(nodeNum));
-          try {
-            haContext.getCuratorFramework().delete().forPath(pathToDelete);
-            gcCounter.increment();
-            LOGGER.debug("Deleted znode " + pathToDelete);
-          } catch (NoNodeException eN) {
-            // We might have endup with holes in the node counter due to network/ZK errors
-            // Ignore the delete error if the node doesn't exist and move on
-            gcCounter.increment();
-          } catch (Exception e) {
-            LOGGER.info("Error cleaning up node " + pathToDelete, e);
-            break;
-          }
-        }
-      }
-    } catch (Exception e) {
-      LOGGER.warn("Error cleaning the cache", e);
-    } finally {
-      if (gcLock.isAcquiredInThisProcess()) {
-        try {
-          gcLock.release();
-        } catch (Exception e) {
-          LOGGER.warn("Error releasing gc lock", e);
-        }
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/sentry/blob/a70cff99/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHdfsMetricsUtil.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHdfsMetricsUtil.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHdfsMetricsUtil.java
index 5bf2f6e..e68c708 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHdfsMetricsUtil.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHdfsMetricsUtil.java
@@ -91,14 +91,6 @@ public class SentryHdfsMetricsUtil {
       MetricRegistry.name(MetastorePlugin.class, "apply-local-update",
           "path-change-size"));
 
-  // Metrics for handleCacheUpdate to ZK in PluginCacheSyncUtil
-  // The time used for each handleCacheUpdate
-  public static final Timer getCacheSyncToZKTimer = sentryMetrics.getTimer(
-      MetricRegistry.name(PluginCacheSyncUtil.class, "cache-sync-to-zk"));
-  // The number of failed handleCacheUpdate
-  public static final Counter getFailedCacheSyncToZK = sentryMetrics.getCounter(
-      MetricRegistry.name(PluginCacheSyncUtil.class, "cache-sync-to-zk", "failed-num"));
-  
   private SentryHdfsMetricsUtil() {
     // Make constructor private to avoid instantiation
   }

http://git-wip-us.apache.org/repos/asf/sentry/blob/a70cff99/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestHAUpdateForwarder.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestHAUpdateForwarder.java b/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestHAUpdateForwarder.java
deleted file mode 100644
index 5246e05..0000000
--- a/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestHAUpdateForwarder.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sentry.hdfs;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.List;
-
-import org.apache.curator.test.TestingServer;
-import org.apache.sentry.hdfs.service.thrift.TRoleChanges;
-import org.apache.sentry.provider.db.service.persistent.HAContext;
-import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-
-public class TestHAUpdateForwarder extends TestUpdateForwarder {
-
-  private TestingServer server;
-
-  @Before
-  public void setup() throws Exception {
-    server = new TestingServer();
-    server.start();
-    testConf.set(ServerConfig.SENTRY_HA_ZOOKEEPER_QUORUM,
-        server.getConnectString());
-    testConf.setBoolean(ServerConfig.SENTRY_HA_ENABLED, true);
-  }
-
-  @Override
-  @After
-  public void cleanup() throws Exception {
-    super.cleanup();
-    server.stop();
-    HAContext.clearServerContext();
-  }
-
-  @Test
-  public void testThriftSerializer() throws Exception {
-    List<String> addGroups = Lists.newArrayList("g1", "g2", "g3");
-    List<String> delGroups = Lists.newArrayList("d1", "d2", "d3");
-    String roleName = "testRole1";
-
-    TRoleChanges roleUpdate = new TRoleChanges(roleName, addGroups, delGroups);
-    TRoleChanges newRoleUpdate = (TRoleChanges) ThriftSerializer.deserialize(
-        roleUpdate, ThriftSerializer.serialize(roleUpdate));
-    assertEquals(roleUpdate, newRoleUpdate);
-  }
-}

http://git-wip-us.apache.org/repos/asf/sentry/blob/a70cff99/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/Fencer.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/Fencer.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/Fencer.java
new file mode 100644
index 0000000..14cdde3
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/Fencer.java
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.provider.db.service.persistent;
+
+import java.util.List;
+
+import com.google.common.base.Joiner;
+
+import javax.jdo.JDOException;
+import javax.jdo.JDOFatalDataStoreException;
+import javax.jdo.PersistenceManager;
+import javax.jdo.PersistenceManagerFactory;
+import javax.jdo.Query;
+import javax.jdo.Transaction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Fences the SQL database.<p/>
+ *
+ * Fencing ensures that any SQL requests that were sent by a previously active
+ * (but now standby) sentry daemon will not be honored.  It also ensures that if
+ * users start up multiple non-HA sentry daemons, only one can become
+ * active.<p/>
+ *
+ * The fencer uses a special SQL table, the SENTRY_FENCE table.  When a sentry
+ * process becomes active, it renames this table so that the name contains the
+ * current "incarnation ID."  The incarnation ID is a randomly generated 128-bit
+ * ID, which changes each time the process is restarted.  From that point
+ * forward, the sentry process includes a SELECT query for the SENTRY_FENCE
+ * table in all update transactions.  This ensures that if the SENTRY_FENCE
+ * table is subsequently renamed again, those update transactions will not
+ * succeed.<p/>
+ *
+ * It is important to distinguish between fencing and leader election.
+ * ZooKeeper is responsible for leader election and ensures that there is only
+ * ever one active sentry daemon at any one time.  However, sentry exists in an
+ * asynchronous network where requests from a previously active daemon may be
+ * arbitrarily delayed before reaching the SQL databse.  There is also a delay
+ * between a process being "de-leadered" by ZooKeeper, and the process itself
+ * becoming aware of this situation.  Java's garbage collection pauses tend to
+ * expose these kinds of race conditions.  The SQL database must be prepared to
+ * reject these stale updates.<p/>
+ *
+ * Given that we need this SQL fencing, why bother with ZooKeeper at all?
+ * ZooKeeper detects when nodes have stopped responding, and elects a new
+ * leader.  The SQL fencing code cannot do that.<p/>
+ */
+public class Fencer {
+  private static final Logger LOGGER = LoggerFactory
+          .getLogger(Fencer.class);
+
+  /**
+   * The base name of the sentry fencer table.<p/>
+   *
+   * We will append the incarnation ID on to this base name to make the final
+   * table name.
+   */
+  private final static String SENTRY_FENCE_TABLE_BASE = "SENTRY_FENCE";
+
+  /**
+   * The update log table name, including the incarnation ID.
+   */
+  private final String tableIncarnationName;
+
+  /**
+   * The SQL accessor that we're using.
+   */
+  private final SqlAccessor sql;
+
+  /**
+   * Create an accessor for the update log.
+   *
+   * @param incarnationId     The ID of the current sentry daemon incarnation.
+   * @param pmf               The PersistenceManagerFactory to use.
+   */
+  public Fencer(String incarnationId, PersistenceManagerFactory pmf) {
+    this.tableIncarnationName = String.
+        format("%s_%s", SENTRY_FENCE_TABLE_BASE, incarnationId);
+    this.sql = SqlAccessor.get(pmf);
+  }
+
+  /**
+   * Finds the name of the fencing table.<p/>
+   *
+   * The name of the update log table will always begin with SENTRY_UPDATE_LOG,
+   * but it may have the ID of a previous sentry incarnation tacked on to it.
+   *
+   * @return the current name of the update log table, or null if there is none.
+   *
+   * @throws JDOFatalDataStoreException    If there is more than one sentry
+   *                                       fencing table.
+   *         JDOException                  If there was a JDO error.
+   */
+  private String findFencingTable(PersistenceManagerFactory pmf) {
+    // Perform a SQL query to find the name of the update log table.
+    PersistenceManager pm = pmf.getPersistenceManager();
+    Query query = pm.newQuery(SqlAccessor.JDO_SQL_ESCAPE,
+        sql.getFindTableByPrefixSql(SENTRY_FENCE_TABLE_BASE));
+    Transaction tx = pm.currentTransaction();
+    try {
+      tx.begin();
+      List<Object> results = (List<Object>) query.execute();
+      if (results.isEmpty()) {
+        return null;
+      } else if (results.size() != 1) {
+        throw new JDOFatalDataStoreException(
+            "Found more than one table whose name begins with " +
+            "SENTRY_UPDATE_LOG: " + Joiner.on(",").join(results));
+      }
+      String tableName = (String)results.get(0);
+      if (!tableName.startsWith(SENTRY_FENCE_TABLE_BASE)) {
+        throw new JDOFatalDataStoreException(
+            "The result of our attempt to locate the update log table was " +
+            "a table name which did not begin with " +
+            SENTRY_FENCE_TABLE_BASE + ", named " + tableName);
+      }
+      LOGGER.info("Found sentry update log table named " + tableName);
+      tx.commit();
+      return tableName;
+    } finally {
+      if (tx.isActive()) {
+        tx.rollback();
+      }
+      query.closeAll();
+    }
+  }
+
+  /**
+   * Creates the fencing table.
+   *
+   * @param pmf                 The PersistenceManagerFactory to use.
+   *
+   * @throws  JDOException      If there was a JDO error.
+   */
+  private void createFenceTable(PersistenceManagerFactory pmf) {
+    PersistenceManager pm = pmf.getPersistenceManager();
+    Transaction tx = pm.currentTransaction();
+    Query query = null;
+    try {
+      tx.begin();
+      query = pm.newQuery(SqlAccessor.JDO_SQL_ESCAPE,
+          sql.getCreateTableSql(tableIncarnationName));
+      query.execute();
+      tx.commit();
+    } finally {
+      if (query != null) {
+        query.closeAll();
+      }
+      if (tx.isActive()) {
+        tx.rollback();
+      }
+      pm.close();
+    }
+  }
+
+  /**
+   * Renames one table to another.
+   *
+   * @param pmf                 The PersistenceManagerFactory to use.
+   * @param src                 The table to rename
+   * @param dst                 The new name of the table.
+   *
+   * @throws  JDOException      If there was a JDO error.
+   */
+  private void renameTable(PersistenceManagerFactory pmf, String src,
+          String dst) {
+    boolean success = false;
+    PersistenceManager pm = pmf.getPersistenceManager();
+    Transaction tx = pm.currentTransaction();
+    Query query = null;
+    try {
+      tx.begin();
+      query = pm.newQuery(SqlAccessor.JDO_SQL_ESCAPE,
+          sql.getRenameTableSql(src, dst));
+      query.execute();
+      tx.commit();
+      success = true;
+    } finally {
+      if (query != null) {
+        query.closeAll();
+      }
+      if (!success) {
+        LOGGER.info("Failed to rename table " + src + " to " + dst);
+        tx.rollback();
+      }
+      pm.close();
+    }
+  }
+
+  /**
+   * Renames the update log table so that only this incarnation can modify it.
+   *
+   * @param pmf                 The PersistenceManagerFactory to use.
+   *
+   * @throws  JDOException      If there was a JDO error.
+   */
+  public void fence(PersistenceManagerFactory pmf) {
+    String curTableName = findFencingTable(pmf);
+    if (curTableName == null) {
+      createFenceTable(pmf);
+      LOGGER.info("Created sentry fence table.");
+    } else if (curTableName.equals(tableIncarnationName)) {
+      LOGGER.info("Sentry fence table is already named " +
+          tableIncarnationName);
+    } else {
+      renameTable(pmf, curTableName, tableIncarnationName);
+      LOGGER.info("Renamed sentry fence table " + curTableName + " to " +
+          tableIncarnationName);
+    }
+  }
+
+  /**
+   * Attempt to append an UpdateLogEntry to the update log.
+   */
+  void verify(PersistenceManager pm) {
+    Query query = pm.newQuery(SqlAccessor.JDO_SQL_ESCAPE,
+        sql.getFetchAllRowsSql(tableIncarnationName));
+    query.execute();
+  }
+
+  String getTableIncarnationName() {
+    return tableIncarnationName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/a70cff99/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HAContext.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HAContext.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HAContext.java
deleted file mode 100644
index cacc29f..0000000
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HAContext.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sentry.provider.db.service.persistent;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.curator.RetryPolicy;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.api.ACLProvider;
-import org.apache.curator.framework.imps.CuratorFrameworkState;
-import org.apache.curator.framework.imps.DefaultACLProvider;
-import org.apache.curator.retry.RetryNTimes;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.sentry.service.thrift.JaasConfiguration;
-import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig;
-import org.apache.zookeeper.ZooDefs.Perms;
-import org.apache.zookeeper.client.ZooKeeperSaslClient;
-import org.apache.zookeeper.data.ACL;
-import org.apache.zookeeper.data.Id;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-
-/**
- * Stores the HA related context
- */
-public class HAContext {
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(HAContext.class);
-  private static volatile HAContext serverHAContext = null;
-  private static boolean aclChecked = false;
-
-  public final static String SENTRY_SERVICE_REGISTER_NAMESPACE = "sentry-service";
-  public static final String SENTRY_ZK_JAAS_NAME = "SentryClient";
-  private final String zookeeperQuorum;
-  private final int retriesMaxCount;
-  private final int sleepMsBetweenRetries;
-  private final String namespace;
-
-  private final boolean zkSecure;
-  private List<ACL> saslACL;
-
-  private final CuratorFramework curatorFramework;
-  private final RetryPolicy retryPolicy;
-
-  protected HAContext(Configuration conf) throws Exception {
-    this.zookeeperQuorum = conf.get(ServerConfig.SENTRY_HA_ZOOKEEPER_QUORUM,
-        ServerConfig.SENTRY_HA_ZOOKEEPER_QUORUM_DEFAULT);
-    this.retriesMaxCount = conf.getInt(ServerConfig.SENTRY_HA_ZOOKEEPER_RETRIES_MAX_COUNT,
-        ServerConfig.SENTRY_HA_ZOOKEEPER_RETRIES_MAX_COUNT_DEFAULT);
-    this.sleepMsBetweenRetries = conf.getInt(ServerConfig.SENTRY_HA_ZOOKEEPER_SLEEP_BETWEEN_RETRIES_MS,
-        ServerConfig.SENTRY_HA_ZOOKEEPER_SLEEP_BETWEEN_RETRIES_MS_DEFAULT);
-    this.namespace = conf.get(ServerConfig.SENTRY_HA_ZOOKEEPER_NAMESPACE,
-        ServerConfig.SENTRY_HA_ZOOKEEPER_NAMESPACE_DEFAULT);
-    this.zkSecure = conf.getBoolean(ServerConfig.SENTRY_HA_ZOOKEEPER_SECURITY,
-        ServerConfig.SENTRY_HA_ZOOKEEPER_SECURITY_DEFAULT);
-    ACLProvider aclProvider;
-    validateConf();
-    if (zkSecure) {
-      LOGGER.info("Connecting to ZooKeeper with SASL/Kerberos and using 'sasl' ACLs");
-      setJaasConfiguration(conf);
-      System.setProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY,
-          SENTRY_ZK_JAAS_NAME);
-      saslACL = Lists.newArrayList();
-      saslACL.add(new ACL(Perms.ALL, new Id("sasl", getServicePrincipal(conf,
-          ServerConfig.PRINCIPAL))));
-      saslACL.add(new ACL(Perms.ALL, new Id("sasl", getServicePrincipal(conf,
-              ServerConfig.SERVER_HA_ZOOKEEPER_CLIENT_PRINCIPAL))));
-      aclProvider = new SASLOwnerACLProvider();
-      String allowConnect = conf.get(ServerConfig.ALLOW_CONNECT);
-
-      if (!Strings.isNullOrEmpty(allowConnect)) {
-        for (String principal : Arrays.asList(allowConnect.split("\\s*,\\s*"))) {
-          LOGGER.info("Adding acls for " + principal);
-          saslACL.add(new ACL(Perms.ALL, new Id("sasl", principal)));
-        }
-      }
-    } else {
-      LOGGER.info("Connecting to ZooKeeper without authentication");
-      aclProvider = new DefaultACLProvider();
-    }
-
-    retryPolicy = new RetryNTimes(retriesMaxCount, sleepMsBetweenRetries);
-    this.curatorFramework = CuratorFrameworkFactory.builder()
-        .namespace(this.namespace)
-        .connectString(this.zookeeperQuorum)
-        .retryPolicy(retryPolicy)
-        .aclProvider(aclProvider)
-        .build();
-    startCuratorFramework();
-  }
-
-  /**
-   * Use common HAContext (ie curator framework connection to ZK)
-   *
-   * @param conf
-   * @throws Exception
-   */
-  public static HAContext getHAContext(Configuration conf) throws Exception {
-    if (serverHAContext == null) {
-      serverHAContext = new HAContext(conf);
-      Runtime.getRuntime().addShutdownHook(new Thread() {
-        @Override
-        public void run() {
-          LOGGER.info("ShutdownHook closing curator framework");
-          try {
-            clearServerContext();
-          } catch (Throwable t) {
-            LOGGER.error("Error stopping SentryService", t);
-          }
-        }
-      });
-
-    }
-    return serverHAContext;
-  }
-
-  // HA context for server which verifies the ZK ACLs on namespace
-  public static HAContext getHAServerContext(Configuration conf) throws Exception {
-    HAContext serverContext = getHAContext(conf);
-    serverContext.checkAndSetACLs();
-    return serverContext;
-  }
-
-  @VisibleForTesting
-  public static synchronized void clearServerContext() {
-    if (serverHAContext != null) {
-      serverHAContext.getCuratorFramework().close();
-      serverHAContext = null;
-    }
-  }
-
-  public void startCuratorFramework() {
-    if (curatorFramework.getState() != CuratorFrameworkState.STARTED) {
-      curatorFramework.start();
-    }
-  }
-
-  public CuratorFramework getCuratorFramework() {
-    return this.curatorFramework;
-  }
-
-  public String getZookeeperQuorum() {
-    return zookeeperQuorum;
-  }
-
-  public static boolean isHaEnabled(Configuration conf) {
-    return conf.getBoolean(ServerConfig.SENTRY_HA_ENABLED, ServerConfig.SENTRY_HA_ENABLED_DEFAULT);
-  }
-
-  public String getNamespace() {
-    return namespace;
-  }
-
-  public RetryPolicy getRetryPolicy() {
-    return retryPolicy;
-  }
-
-  private void validateConf() {
-    Preconditions.checkNotNull(zookeeperQuorum, "Zookeeper Quorum should not be null.");
-    Preconditions.checkNotNull(namespace, "Zookeeper namespace should not be null.");
-  }
-
-  protected String getServicePrincipal(Configuration conf, String confProperty)
-      throws IOException {
-    String principal = conf.get(confProperty);
-    Preconditions.checkNotNull(principal);
-    Preconditions.checkArgument(principal.length() != 0, "Server principal is not right.");
-    return principal.split("[/@]")[0];
-  }
-
-  private void checkAndSetACLs() throws Exception {
-    if (zkSecure && !aclChecked) {
-      // If znodes were previously created without security enabled, and now it is, we need to go through all existing znodes
-      // and set the ACLs for them. This is done just once at the startup
-      // We can't get the namespace znode through curator; have to go through zk client
-      startCuratorFramework();
-      String newNamespace = "/" + curatorFramework.getNamespace();
-      if (curatorFramework.getZookeeperClient().getZooKeeper().exists(newNamespace, null) != null) {
-        List<ACL> acls = curatorFramework.getZookeeperClient().getZooKeeper().getACL(newNamespace, new Stat());
-        if (acls.isEmpty() || !acls.get(0).getId().getScheme().equals("sasl")) {
-          LOGGER.info("'sasl' ACLs not set; setting...");
-          List<String> children = curatorFramework.getZookeeperClient().getZooKeeper().getChildren(newNamespace, null);
-          for (String child : children) {
-            checkAndSetACLs("/" + child);
-          }
-          curatorFramework.getZookeeperClient().getZooKeeper().setACL(newNamespace, saslACL, -1);
-        }
-      }
-      aclChecked = true;
-
-    }
-  }
-
-  private void checkAndSetACLs(String path) throws Exception {
-      LOGGER.info("Setting acls on " + path);
-      List<String> children = curatorFramework.getChildren().forPath(path);
-      for (String child : children) {
-        checkAndSetACLs(path + "/" + child);
-      }
-      curatorFramework.setACL().withACL(saslACL).forPath(path);
-  }
-
-  // This gets ignored during most tests, see ZKXTestCaseWithSecurity#setupZKServer()
-  private void setJaasConfiguration(Configuration conf) throws IOException {
-    if ("false".equalsIgnoreCase(conf.get(
-          ServerConfig.SERVER_HA_ZOOKEEPER_CLIENT_TICKET_CACHE,
-          ServerConfig.SERVER_HA_ZOOKEEPER_CLIENT_TICKET_CACHE_DEFAULT))) {
-      String keytabFile = conf.get(ServerConfig.SERVER_HA_ZOOKEEPER_CLIENT_KEYTAB);
-      Preconditions.checkArgument(keytabFile.length() != 0, "Keytab File is not right.");
-      String principal = conf.get(ServerConfig.SERVER_HA_ZOOKEEPER_CLIENT_PRINCIPAL);
-      principal = SecurityUtil.getServerPrincipal(principal,
-        conf.get(ServerConfig.RPC_ADDRESS, ServerConfig.RPC_ADDRESS_DEFAULT));
-      Preconditions.checkArgument(principal.length() != 0, "Kerberos principal is not right.");
-
-      // This is equivalent to writing a jaas.conf file and setting the system property, "java.security.auth.login.config", to
-      // point to it (but this way we don't have to write a file, and it works better for the tests)
-      JaasConfiguration.addEntryForKeytab(SENTRY_ZK_JAAS_NAME, principal, keytabFile);
-    } else {
-      // Create jaas conf for ticket cache
-      JaasConfiguration.addEntryForTicketCache(SENTRY_ZK_JAAS_NAME);
-    }
-    javax.security.auth.login.Configuration.setConfiguration(JaasConfiguration.getInstance());
-  }
-
-  public class SASLOwnerACLProvider implements ACLProvider {
-    @Override
-    public List<ACL> getDefaultAcl() {
-        return saslACL;
-    }
-
-    @Override
-    public List<ACL> getAclForPath(String path) {
-        return saslACL;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/sentry/blob/a70cff99/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
index 7dad496..6e367e5 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
@@ -71,6 +71,8 @@ import org.apache.sentry.provider.db.service.thrift.TSentryMappingData;
 import org.apache.sentry.provider.db.service.thrift.TSentryPrivilege;
 import org.apache.sentry.provider.db.service.thrift.TSentryPrivilegeMap;
 import org.apache.sentry.provider.db.service.thrift.TSentryRole;
+import org.apache.sentry.service.thrift.Activator;
+import org.apache.sentry.service.thrift.Activators;
 import org.apache.sentry.service.thrift.ServiceConstants.PrivilegeScope;
 import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig;
 import org.datanucleus.store.rdbms.exceptions.MissingTableException;
@@ -102,7 +104,6 @@ public class SentryStore {
   public static final String NULL_COL = "__NULL__";
   public static int INDEX_GROUP_ROLES_MAP = 0;
   public static int INDEX_USER_ROLES_MAP = 1;
-  static final String DEFAULT_DATA_DIR = "sentry_policy_db";
 
   private static final Set<String> ALL_ACTIONS = Sets.newHashSet(AccessConstants.ALL,
       AccessConstants.SELECT, AccessConstants.INSERT, AccessConstants.ALTER,
@@ -116,6 +117,11 @@ public class SentryStore {
       AccessConstants.ACTION_ALL.toLowerCase(), AccessConstants.SELECT, AccessConstants.INSERT);
 
   /**
+   * The activator object which tells us whether the current daemon is active.
+   */
+  private final Activator act;
+
+  /**
    * Commit order sequence id. This is used by notification handlers
    * to know the order in which events where committed to the database.
    * This instance variable is incremented in incrementGetSequenceId
@@ -128,10 +134,8 @@ public class SentryStore {
   private PrivCleaner privCleaner = null;
   private Thread privCleanerThread = null;
 
-  public SentryStore(Configuration conf) throws SentryNoSuchObjectException,
-  SentryAccessDeniedException, SentrySiteConfigurationException, IOException {
-    commitSequenceId = 0;
-    this.conf = conf;
+  public static Properties getDataNucleusProperties(Configuration conf)
+      throws SentrySiteConfigurationException, IOException {
     Properties prop = new Properties();
     prop.putAll(ServerConfig.SENTRY_STORE_DEFAULTS);
     String jdbcUrl = conf.get(ServerConfig.SENTRY_STORE_JDBC_URL, "").trim();
@@ -164,8 +168,19 @@ public class SentryStore {
         prop.setProperty(key, entry.getValue());
       }
     }
+    // Disallow operations outside of transactions
+    prop.setProperty("datanucleus.NontransactionalRead", "false");
+    prop.setProperty("datanucleus.NontransactionalWrite", "false");
+    return prop;
+  }
 
-
+  public SentryStore(Configuration conf)
+      throws SentryNoSuchObjectException, SentryAccessDeniedException,
+          SentrySiteConfigurationException, IOException {
+    this.act = Activators.INSTANCE.get(conf);
+    commitSequenceId = 0;
+    this.conf = conf;
+    Properties prop = getDataNucleusProperties(conf);
     boolean checkSchemaVersion = conf.get(
         ServerConfig.SENTRY_VERIFY_SCHEM_VERSION,
         ServerConfig.SENTRY_VERIFY_SCHEM_VERSION_DEFAULT).equalsIgnoreCase(
@@ -175,11 +190,6 @@ public class SentryStore {
       prop.setProperty("datanucleus.autoCreateSchema", "true");
       prop.setProperty("datanucleus.fixedDatastore", "false");
     }
-
-    // Disallow operations outside of transactions
-    prop.setProperty("datanucleus.NontransactionalRead", "false");
-    prop.setProperty("datanucleus.NontransactionalWrite", "false");
-
     pmf = JDOHelper.getPersistenceManagerFactory(prop);
     verifySentryStoreSchema(checkSchemaVersion);
 

http://git-wip-us.apache.org/repos/asf/sentry/blob/a70cff99/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/ServiceRegister.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/ServiceRegister.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/ServiceRegister.java
deleted file mode 100644
index 79dfe48..0000000
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/ServiceRegister.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sentry.provider.db.service.persistent;
-
-import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
-import org.apache.curator.x.discovery.ServiceInstance;
-import org.apache.curator.x.discovery.details.InstanceSerializer;
-
-public class ServiceRegister {
-
-  private HAContext haContext;
-
-  public ServiceRegister(HAContext haContext) {
-    this.haContext = haContext;
-  }
-
-  public void regService(String host, int port) throws Exception {
-
-    haContext.startCuratorFramework();
-    ServiceInstance<Void> serviceInstance = ServiceInstance.<Void>builder()
-        .address(host)
-        .port(port)
-        .name(HAContext.SENTRY_SERVICE_REGISTER_NAMESPACE)
-        .build();
-
-    InstanceSerializer<Void> instanceSerializer = new FixedJsonInstanceSerializer<Void>(Void.class);
-    ServiceDiscoveryBuilder.builder(Void.class)
-        .basePath(HAContext.SENTRY_SERVICE_REGISTER_NAMESPACE)
-        .client(haContext.getCuratorFramework())
-        .serializer(instanceSerializer)
-        .thisInstance(serviceInstance)
-        .build()
-        .start();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/sentry/blob/a70cff99/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SqlAccessor.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SqlAccessor.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SqlAccessor.java
new file mode 100644
index 0000000..9879e67
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SqlAccessor.java
@@ -0,0 +1,309 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.provider.db.service.persistent;
+
+import java.sql.Connection;
+
+import javax.jdo.PersistenceManager;
+import javax.jdo.PersistenceManagerFactory;
+import javax.jdo.datastore.JDOConnection;
+
+/**
+ * An accessor for a SQL database.
+ *
+ * SqlAccessor objects generate raw SQL statements in a variety of dialects.
+ * We use this to do stuff that the DataNucleus architects didn't anticipate,
+ * like rename tables or search for tables by a prefix name.<p/>
+ *
+ * This class exists only to implement fencing.  While it's theoretically
+ * possible to do other things with it, it is almost always better to use the
+ * functionality provided by DataNucleus if it is at all possible.<p/>
+ *
+ * Note: do NOT pass any untrusted user input into these functions.  You must
+ * NOT create SQL statements from unsanitized user input because they may expose
+ * you to SQL injection attacks.  Use prepared statements if you need to do that
+ * (yes, it's possible via DataNucleus.)<p/>
+ */
+abstract class SqlAccessor {
+  /**
+   * The string which we can use with PersistenceManager#newQuery to perform raw
+   * SQL operations.
+   */
+  final static String JDO_SQL_ESCAPE = "javax.jdo.query.SQL";
+
+  /**
+   * Get an accessor for the SQL database that we're using.
+   *
+   * @return The singleton accessor instance for the SQL database we are using.
+   *
+   * @throws RuntimeException     If there was an error loading the SqlAccessor.
+   *                              This could happen because we don't know the
+   *                              type of database that we're using.  In theory
+   *                              it could also happen because JDO is being run
+   *                              against something that is not a SQL databse at
+   *                              all.
+   */
+  static SqlAccessor get(PersistenceManagerFactory pmf) {
+    String productName = getProductNameString(pmf).toLowerCase();
+    if (productName.contains("postgresql")) {
+      return PostgresSqlAccessor.INSTANCE;
+    } else if (productName.contains("mysql")) {
+      return MySqlSqlAccessor.INSTANCE;
+    } else if (productName.contains("oracle")) {
+      return OracleSqlAccessor.INSTANCE;
+    } else if (productName.contains("derby")) {
+      return DerbySqlAccessor.INSTANCE;
+    } else if (productName.contains("db2")) {
+      return Db2SqlAccessor.INSTANCE;
+    } else {
+      throw new RuntimeException("Unknown database type " +
+      "'" + productName + "'.  Supported database types are " +
+      "postgres, mysql, oracle, mssql, and derby.");
+    }
+  }
+
+  /**
+   * @return    An string describing the type of database that we're using.
+   */
+  static private String getProductNameString(PersistenceManagerFactory pmf) {
+    PersistenceManager pm = pmf.getPersistenceManager();
+    JDOConnection jdoConn = pm.getDataStoreConnection();
+    try {
+      return ((Connection)jdoConn.getNativeConnection()).getMetaData().
+          getDatabaseProductName();
+    } catch (Throwable t) {
+      throw new RuntimeException("Error retrieving database product " +
+          "name", t);
+    } finally {
+      // We must release the connection before we call other pm methods.
+      jdoConn.close();
+    }
+  }
+
+  /**
+   * Get the name of this database.
+   *
+   * @return          The name of this databse.
+   */
+  abstract String getDatabaseName();
+
+  /**
+   * Get the SQL for finding a table that starts with the given prefix.
+   *
+   * @param prefix    The prefix of the table to find.
+   * @return          The SQL.
+   */
+  abstract String getFindTableByPrefixSql(String prefix);
+
+  /**
+   * Get the SQL for creating a table with the given name.
+   *
+   * @param name      The name of the table to create.
+   * @return          The SQL.
+   */
+  abstract String getCreateTableSql(String name);
+
+  /**
+   * Get the SQL for renaming a table.
+   *
+   * @param src       The name of the table to rename.
+   * @param dst       The new name to give to the table.
+   * @return          The SQL.
+   */
+  abstract String getRenameTableSql(String src, String dst);
+
+  /**
+   * Get the SQL for fetching all rows from the given table.
+   *
+   * @param name      The table name.
+   * @return          The SQL.
+   */
+  abstract String getFetchAllRowsSql(String name);
+
+  /**
+   * The postgres database type.<p/>
+   *
+   * Postgres is case-senstitive, but will translate all identifiers to
+   * lowercase unless you quote them.  So we quote all identifiers when using
+   * postgres.
+   */
+  private static class PostgresSqlAccessor extends SqlAccessor {
+    static final PostgresSqlAccessor INSTANCE = new PostgresSqlAccessor();
+
+    @Override
+    String getDatabaseName() {
+      return "postgres";
+    }
+
+    @Override
+    String getFindTableByPrefixSql(String prefix) {
+      return "SELECT table_name FROM information_schema.tables " +
+          "WHERE table_name LIKE '" + prefix + "%'";
+    }
+
+    @Override
+    String getCreateTableSql(String name) {
+      return "CREATE TABLE \"" + name + "\" (\"VAL\" VARCHAR(512))";
+    }
+
+    @Override
+    String getRenameTableSql(String src, String dst) {
+      return "ALTER TABLE \"" + src + "\" RENAME TO \"" + dst + "\"";
+    }
+
+    @Override
+    String getFetchAllRowsSql(String tableName) {
+      return "SELECT * FROM \"" + tableName + "\"";
+    }
+  }
+
+  /**
+   * The MySQL database type.<p/>
+   *
+   * MySQL can't handle quotes unless specifically configured to accept them.
+   */
+  private static class MySqlSqlAccessor extends SqlAccessor {
+    static final MySqlSqlAccessor INSTANCE = new MySqlSqlAccessor();
+
+    @Override
+    String getDatabaseName() {
+      return "mysql";
+    }
+
+    @Override
+    String getFindTableByPrefixSql(String prefix) {
+      return "SELECT table_name FROM information_schema.tables " +
+          "WHERE table_name LIKE " + prefix + "%";
+    }
+
+    @Override
+    String getCreateTableSql(String name) {
+      return "CREATE TABLE " + name + " (VAL VARCHAR(512))";
+    }
+
+    @Override
+    String getRenameTableSql(String src, String dst) {
+      return "RENAME TABLE " + src + " TO " + dst;
+    }
+
+    @Override
+    String getFetchAllRowsSql(String tableName) {
+      return "SELECT * FROM " + tableName;
+    }
+  }
+
+  /**
+   * The Oracle database type.<p/>
+   */
+  private static class OracleSqlAccessor extends SqlAccessor {
+    static final OracleSqlAccessor INSTANCE = new OracleSqlAccessor();
+
+    @Override
+    String getDatabaseName() {
+      return "oracle";
+    }
+
+    @Override
+    String getFindTableByPrefixSql(String prefix) {
+      return "SELECT table_name FROM all_tables " +
+          "WHERE table_name LIKE " + prefix + "%";
+    }
+
+    @Override
+    String getCreateTableSql(String name) {
+      return "CREATE TABLE " + name + " (VAL VARCHAR(512))";
+    }
+
+    @Override
+    String getRenameTableSql(String src, String dst) {
+      return "RENAME TABLE " + src + " TO " + dst;
+    }
+
+    @Override
+    String getFetchAllRowsSql(String tableName) {
+      return "SELECT * FROM " + tableName;
+    }
+  }
+
+  /**
+   * The Derby database type.</p>
+   */
+  private static class DerbySqlAccessor extends SqlAccessor {
+    static final DerbySqlAccessor INSTANCE = new DerbySqlAccessor();
+
+    @Override
+    String getFindTableByPrefixSql(String prefix) {
+      return "SELECT tablename FROM SYS.SYSTABLES " +
+          "WHERE tablename LIKE '" + prefix + "%'";
+    }
+
+    @Override
+    String getCreateTableSql(String name) {
+      return "CREATE TABLE " + name + " (VAL VARCHAR(512))";
+    }
+
+    @Override
+    String getRenameTableSql(String src, String dst) {
+      return "RENAME TABLE " + src + " TO " + dst;
+    }
+
+    @Override
+    String getDatabaseName() {
+      return "derby";
+    }
+
+    @Override
+    String getFetchAllRowsSql(String tableName) {
+      return "SELECT * FROM " + tableName;
+    }
+  }
+
+  /**
+   * The DB2 database type.</p>
+   */
+  private static class Db2SqlAccessor extends SqlAccessor {
+    static final Db2SqlAccessor INSTANCE = new Db2SqlAccessor();
+
+    @Override
+    String getFindTableByPrefixSql(String prefix) {
+      return "SELECT tablename FROM SYS.SYSTABLES " +
+          "WHERE tablename LIKE '" + prefix + "%'";
+    }
+
+    @Override
+    String getCreateTableSql(String name) {
+      return "CREATE TABLE " + name + " (VAL VARCHAR(512))";
+    }
+
+    @Override
+    String getRenameTableSql(String src, String dst) {
+      return "RENAME TABLE " + src + " TO " + dst;
+    }
+
+    @Override
+    String getDatabaseName() {
+      return "db2";
+    }
+
+    @Override
+    String getFetchAllRowsSql(String tableName) {
+      return "SELECT * FROM " + tableName;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/a70cff99/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
index 3de1f65..19daa75 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
@@ -46,9 +46,7 @@ import org.apache.sentry.provider.db.log.entity.JsonLogEntity;
 import org.apache.sentry.provider.db.log.entity.JsonLogEntityFactory;
 import org.apache.sentry.provider.db.log.util.Constants;
 import org.apache.sentry.provider.db.service.persistent.CommitContext;
-import org.apache.sentry.provider.db.service.persistent.HAContext;
 import org.apache.sentry.provider.db.service.persistent.SentryStore;
-import org.apache.sentry.provider.db.service.persistent.ServiceRegister;
 import org.apache.sentry.provider.db.service.thrift.PolicyStoreConstants.PolicyStoreServerConfig;
 import org.apache.sentry.service.thrift.SentryServiceUtil;
 import org.apache.sentry.service.thrift.ServiceConstants;
@@ -84,30 +82,18 @@ public class SentryPolicyStoreProcessor implements SentryPolicyService.Iface {
   private final SentryStore sentryStore;
   private final NotificationHandlerInvoker notificationHandlerInvoker;
   private final ImmutableSet<String> adminGroups;
-  private boolean isReady;
   SentryMetrics sentryMetrics;
-  private HAContext haContext;
 
   private List<SentryPolicyStorePlugin> sentryPlugins = new LinkedList<SentryPolicyStorePlugin>();
 
-  public SentryPolicyStoreProcessor(String name, Configuration conf) throws Exception {
+  public SentryPolicyStoreProcessor(String name,
+        Configuration conf) throws Exception {
     super();
     this.name = name;
     this.conf = conf;
     this.notificationHandlerInvoker = new NotificationHandlerInvoker(conf,
         createHandlers(conf));
-    isReady = false;
-    if (conf.getBoolean(ServerConfig.SENTRY_HA_ENABLED,
-        ServerConfig.SENTRY_HA_ENABLED_DEFAULT)) {
-      haContext = HAContext.getHAServerContext(conf);
-      sentryStore = new SentryStore(conf);
-      ServiceRegister reg = new ServiceRegister(haContext);
-      reg.regService(conf.get(ServerConfig.RPC_ADDRESS),
-          conf.getInt(ServerConfig.RPC_PORT,ServerConfig.RPC_PORT_DEFAULT));
-    } else {
-      sentryStore = new SentryStore(conf);
-    }
-    isReady = true;
+    sentryStore = new SentryStore(conf);
     adminGroups = ImmutableSet.copyOf(toTrimedLower(Sets.newHashSet(conf.getStrings(
         ServerConfig.ADMIN_GROUPS, new String[]{}))));
     Iterable<String> pluginClasses = ConfUtilties.CLASS_SPLITTER
@@ -149,16 +135,7 @@ public class SentryPolicyStoreProcessor implements SentryPolicyService.Iface {
   }
 
   public void stop() {
-    if (isReady) {
-      sentryStore.stop();
-    }
-    if (haContext != null) {
-      try {
-        haContext.getCuratorFramework().close();
-      } catch (Exception e) {
-        LOGGER.warn("Error in stopping processor", e);
-      }
-    }
+    sentryStore.stop();
   }
 
   public void registerPlugin(SentryPolicyStorePlugin plugin) throws SentryPluginException {

http://git-wip-us.apache.org/repos/asf/sentry/blob/a70cff99/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/Activator.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/Activator.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/Activator.java
new file mode 100644
index 0000000..0b7ddf5
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/Activator.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.service.thrift;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sentry.provider.db.service.persistent.Fencer;
+import org.apache.sentry.provider.db.service.persistent.SentryStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jdo.JDOHelper;
+import javax.jdo.PersistenceManagerFactory;
+
+/**
+ * The activator is used to access and modify the activation state of the sentry daemon.<p/>
+ */
+public class Activator implements Closeable {
+  private static final Logger LOGGER = LoggerFactory.getLogger(Activator.class);
+
+  /**
+   * The DataNucleus PersistenceManagerFactory to use.
+   */
+  private final PersistenceManagerFactory pmf;
+
+  /**
+   * The handler for LeaderStatus callbacks.
+   */
+  private final TransitionHandler handler;
+
+  /**
+   * LeaderStatus generates callbacks to let us know when we are active or
+   * standby.  When HA is enabled, it manages ZK sessions.
+   */
+  private final LeaderStatus leaderStatus;
+
+  /**
+   * The fencer object.
+   */
+  private final Fencer fencer;
+
+  /**
+   * True if the Activator is active.
+   */
+  private boolean active;
+
+  public Activator(Configuration conf) throws Exception {
+    Properties props = SentryStore.getDataNucleusProperties(conf);
+    this.pmf = JDOHelper.getPersistenceManagerFactory(props);
+    this.handler = new TransitionHandler();
+    this.leaderStatus = new LeaderStatus(handler, conf);
+    this.fencer = new Fencer(this.leaderStatus.getIncarnationId(), pmf);
+    this.active = false;
+    this.leaderStatus.start();
+  }
+
+  @Override
+  public void close() throws IOException {
+    this.leaderStatus.close();
+    this.pmf.close();
+  }
+
+  private class TransitionHandler implements LeaderStatus.Listener {
+    @Override
+    public void becomeActive() throws Exception {
+      synchronized (Activator.this) {
+        if (!active) {
+          LOGGER.info("Activating " + leaderStatus.getIncarnationId());
+          fencer.fence(pmf);
+          active = true;
+        }
+      }
+    }
+
+    @Override
+    public void becomeStandby() {
+      synchronized (Activator.this) {
+        if (active) {
+          LOGGER.info("Deactivating " + leaderStatus.getIncarnationId());
+          active = false;
+        }
+      }
+    }
+  }
+
+  synchronized boolean isActive() {
+    return active;
+  }
+
+  public synchronized String getIncarnationId() {
+    return leaderStatus.getIncarnationId();
+  }
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/a70cff99/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/Activators.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/Activators.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/Activators.java
new file mode 100644
index 0000000..37b0219
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/Activators.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.service.thrift;
+
+import java.util.HashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sentry.core.common.utils.SentryConstants;
+
+/**
+ * A global map from incarnation IDs to Activator objects.<p/>
+ *
+ * This is used to access the current global Activator.  Normally there will
+ * only be one Activator used in a sentry process.  There may be multiple
+ * Activator objects in the case where we are running unit tests.
+ */
+public enum Activators {
+  INSTANCE;
+
+  private final HashMap<String, Activator> acts = new HashMap<String, Activator>();
+
+  Activators() {}
+
+  public synchronized void put(Activator act) {
+    acts.put(act.getIncarnationId(), act);
+  }
+
+  public Activator get(Configuration conf) {
+    String key = conf.get(SentryConstants.CURRENT_INCARNATION_ID_KEY);
+    if (key == null) {
+      throw new RuntimeException("No " +
+          SentryConstants.CURRENT_INCARNATION_ID_KEY + "set.");
+    }
+    return get(key);
+  }
+
+  public synchronized Activator get(String incarnationId) {
+    Activator act = acts.get(incarnationId);
+    if (act == null) {
+      throw new RuntimeException("No activator found with " +
+          "incarnationId " + incarnationId);
+    }
+    return act;
+  }
+
+  public synchronized void remove(Activator act) {
+    Activator removed = acts.remove(act.getIncarnationId());
+    if (removed == null) {
+      throw new RuntimeException("No activator found with " +
+          "incarnationId " + act.getIncarnationId());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/a70cff99/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/LeaderStatus.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/LeaderStatus.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/LeaderStatus.java
index e846766..e32e1db 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/LeaderStatus.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/LeaderStatus.java
@@ -16,10 +16,10 @@
  */
 package org.apache.sentry.service.thrift;
 
-import org.apache.commons.codec.binary.Hex;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.datanucleus.util.Base64;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -79,13 +79,36 @@ final class LeaderStatus implements Closeable {
   private final AtomicBoolean closed = new AtomicBoolean(false);
 
   /**
-   * Generate a 128-bit random ID.
+   * Generate a very long random ID.
+   *
+   * We want a name that doesn't start with a number, and which
+   * contains only letters and numbers.  This is important because
+   * the incarnation ID gets used in SQL databases to name a table.
    */
   static String generateIncarnationId() {
     SecureRandom srand = new SecureRandom();
-    byte[] buf = new byte[32];
+    byte[] buf = new byte[33];
     srand.nextBytes(buf);
-    return "sentry_" + Hex.encodeHexString(buf);
+    char[] cbuf = Base64.encode(buf);
+    StringBuilder bld = new StringBuilder();
+    for (int i = 0; i < cbuf.length; i++) {
+      boolean safe;
+      if (i == 0) {
+        // Some databases can't handle identiifers that start with numbers,
+        // so always start with a letter.  Also replace '+' or '/' with
+        // something safe.
+        safe = Character.isLetter(cbuf[i]);
+      } else {
+        // Replace '+' or '/' with something safe.
+        safe = Character.isLetterOrDigit(cbuf[i]);
+      }
+      if (!safe) {
+        bld.append((char)('a' + srand.nextInt(26)));
+      } else {
+        bld.append(cbuf[i]);
+      }
+    }
+    return bld.toString();
   }
 
   LeaderStatus(Listener listener, Configuration conf) throws Exception {

http://git-wip-us.apache.org/repos/asf/sentry/blob/a70cff99/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/LeaderStatusAdaptor.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/LeaderStatusAdaptor.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/LeaderStatusAdaptor.java
index 80a6571..33a5e7b 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/LeaderStatusAdaptor.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/LeaderStatusAdaptor.java
@@ -81,6 +81,11 @@ final class LeaderStatusAdaptor
   private long becomeLeaderCount = 0;
 
   /**
+   * True only if this LeaderStatusAdaptor is closed.
+   */
+  private boolean isClosed = false;
+
+  /**
    * True only if this incarnation is currently active.
    */
   private boolean isActive = false;
@@ -112,9 +117,36 @@ final class LeaderStatusAdaptor
     this.leaderSelector.start();
   }
 
+  /**
+   * Shut down the LeaderStatusAdaptor and wait for it to transition to
+   * standby.
+   */
   @Override
   public void close() throws IOException {
+    // If the adaptor is already closed, calling close again is a no-op.
+    // Setting isClosed also prevents activation after this point.
+    lock.lock();
+    try {
+      if (isClosed) {
+        return;
+      }
+      isClosed = true;
+    } finally {
+      lock.unlock();
+    }
+
+    // Shut down our Curator hooks.
     leaderSelector.close();
+
+    // Wait for the adaptor to transition to standby state.
+    lock.lock();
+    try {
+      while (isActive) {
+        cond.awaitUninterruptibly();
+      }
+    } finally {
+      lock.unlock();
+    }
   }
 
   /**
@@ -148,9 +180,14 @@ final class LeaderStatusAdaptor
   public void takeLeadership(CuratorFramework client) throws Exception {
     lock.lock();
     try {
+      if (isClosed) {
+        LOG.info("LeaderStatusAdaptor: can't become active because the " +
+            "adaptor is closed.");
+        return;
+      }
       isActive = true;
       becomeLeaderCount++;
-      LOG.info("SentryLeaderSelectorClient: becoming active.  " +
+      LOG.info("LeaderStatusAdaptor: becoming active.  " +
           "becomeLeaderCount=" + becomeLeaderCount);
       listener.becomeActive();
       while (isActive) {
@@ -158,7 +195,7 @@ final class LeaderStatusAdaptor
       }
     } finally {
       isActive = false;
-      LOG.info("SentryLeaderSelectorClient: becoming standby");
+      LOG.info("LeaderStatusAdaptor: becoming standby");
       try {
         listener.becomeStandby();
       } catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/sentry/blob/a70cff99/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
index 809af06..531ab35 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.security.SaslRpcServer;
 import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.sentry.Command;
+import org.apache.sentry.core.common.utils.SentryConstants;
 import org.apache.sentry.provider.db.service.thrift.SentryHealthCheckServletContextListener;
 import org.apache.sentry.provider.db.service.thrift.SentryMetrics;
 import org.apache.sentry.provider.db.service.thrift.SentryMetricsServletContextListener;
@@ -95,11 +96,10 @@ public class SentryService implements Callable {
   private SentryWebServer sentryWebServer;
   private long maxMessageSize;
   private final boolean isHA;
-  private volatile boolean isActive = false;
+  private final Activator act;
   SentryMetrics sentryMetrics;
-  private final LeaderStatus leaderStatus;
 
-  public SentryService(Configuration conf) {
+  public SentryService(Configuration conf) throws Exception {
     this.conf = conf;
     int port = conf
         .getInt(ServerConfig.RPC_PORT, ServerConfig.RPC_PORT_DEFAULT);
@@ -153,25 +153,10 @@ public class SentryService implements Callable {
             + (count++));
       }
     });
-    try {
-      leaderStatus = new LeaderStatus(
-          new LeaderStatus.Listener() {
-            @Override
-            public void becomeActive() throws Exception {
-              LOGGER.info("Activating " + leaderStatus.getIncarnationId());
-              isActive = true;
-            }
-
-            @Override
-            public void becomeStandby() {
-              LOGGER.info("Deactivating " + leaderStatus.getIncarnationId());
-              isActive = false;
-            }
-          }, conf);
-      leaderStatus.start(); // TODO: move this into call?
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
+    this.act = new Activator(conf);
+    conf.set(SentryConstants.CURRENT_INCARNATION_ID_KEY,
+        this.act.getIncarnationId());
+    Activators.INSTANCE.put(act);
     webServerPort = conf.getInt(ServerConfig.SENTRY_WEB_PORT, ServerConfig.SENTRY_WEB_PORT_DEFAULT);
     status = Status.NOT_STARTED;
   }
@@ -307,7 +292,7 @@ public class SentryService implements Callable {
   public synchronized void stop() throws Exception{
     MultiException exception = null;
     LOGGER.info("Attempting to stop...");
-    leaderStatus.close();
+    act.close();
     if (isRunning()) {
       LOGGER.info("Attempting to stop sentry thrift service...");
       try {
@@ -462,7 +447,7 @@ public class SentryService implements Callable {
     return new Gauge<Boolean>() {
       @Override
       public Boolean getValue() {
-        return isActive;
+        return act.isActive();
       }
     };
   }

http://git-wip-us.apache.org/repos/asf/sentry/blob/a70cff99/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java
index 0ab8192..abc3f58 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java
@@ -141,6 +141,7 @@ public class ServiceConstants {
         .put("datanucleus.transactionIsolation", "read-committed")
         .put("datanucleus.cache.level2", "false")
         .put("datanucleus.cache.level2.type", "none")
+        .put("datanucleus.query.sql.allowAll", "true")
         .put("datanucleus.identifierFactory", "datanucleus1")
         .put("datanucleus.rdbms.useLegacyNativeValueStrategy", "true")
         .put("datanucleus.plugin.pluginRegistryBundleCheck", "LOG")
@@ -258,4 +259,7 @@ public class ServiceConstants {
     TABLE,
     COLUMN
   }
+
+  public static final String SENTRY_ZK_JAAS_NAME = "Sentry";
+  public static final String CURRENT_INCARNATION_ID_KEY = "current.incarnation.key";
 }

http://git-wip-us.apache.org/repos/asf/sentry/blob/a70cff99/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/generic/service/persistent/SentryStoreIntegrationBase.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/generic/service/persistent/SentryStoreIntegrationBase.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/generic/service/persistent/SentryStoreIntegrationBase.java
index f14b586..5999580 100644
--- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/generic/service/persistent/SentryStoreIntegrationBase.java
+++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/generic/service/persistent/SentryStoreIntegrationBase.java
@@ -21,7 +21,11 @@ import java.io.File;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.sentry.provider.file.PolicyFile;
+import org.apache.sentry.service.thrift.Activator;
+import org.apache.sentry.service.thrift.Activators;
+import org.apache.sentry.service.thrift.ServiceConstants;
 import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -34,6 +38,7 @@ public abstract class SentryStoreIntegrationBase {
   private static File dataDir;
   private static File policyFilePath;
   protected static Configuration conf;
+  protected static Activator act;
   protected static DelegateSentryStore sentryStore;
   protected static PolicyFile policyFile;
 
@@ -57,6 +62,9 @@ public abstract class SentryStoreIntegrationBase {
     policyFilePath = new File(Files.createTempDir(), "local_policy_file.ini");
     conf.set(ServerConfig.SENTRY_STORE_GROUP_MAPPING_RESOURCE,
         policyFilePath.getPath());
+    act = new Activator(conf);
+		conf.set(ServiceConstants.CURRENT_INCARNATION_ID_KEY, act.getIncarnationId());
+    Activators.INSTANCE.put(act);
   }
 
   @After
@@ -66,6 +74,9 @@ public abstract class SentryStoreIntegrationBase {
 
   @AfterClass
   public static void teardown() {
+    if (act != null) {
+      IOUtils.cleanup(null, act);
+    }
     if (sentryStore != null) {
       sentryStore.close();
     }
@@ -75,6 +86,10 @@ public abstract class SentryStoreIntegrationBase {
     if (policyFilePath != null) {
       FileUtils.deleteQuietly(policyFilePath);
     }
+    if (act != null) {
+      Activators.INSTANCE.remove(act);
+      act = null;
+    }
   }
 
   public static void addGroupsToUser(String user, String... groupNames) {

http://git-wip-us.apache.org/repos/asf/sentry/blob/a70cff99/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/generic/service/persistent/TestPrivilegeOperatePersistence.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/generic/service/persistent/TestPrivilegeOperatePersistence.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/generic/service/persistent/TestPrivilegeOperatePersistence.java
index 799d5ef..7c66db4 100644
--- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/generic/service/persistent/TestPrivilegeOperatePersistence.java
+++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/generic/service/persistent/TestPrivilegeOperatePersistence.java
@@ -37,6 +37,8 @@ import org.apache.sentry.core.model.sqoop.SqoopActionConstant;
 import org.apache.sentry.core.common.exception.SentryGrantDeniedException;
 import org.apache.sentry.provider.db.generic.service.persistent.PrivilegeObject.Builder;
 import org.apache.sentry.provider.file.PolicyFile;
+import org.apache.sentry.service.thrift.Activator;
+import org.apache.sentry.service.thrift.Activators;
 import org.apache.sentry.service.thrift.ServiceConstants;
 import org.junit.Before;
 import org.junit.Test;
@@ -987,8 +989,14 @@ public class TestPrivilegeOperatePersistence extends SentryStoreIntegrationBase
     Configuration confCopy = new Configuration(conf);
     confCopy.set(String.format(ServiceConstants.ServerConfig.SENTRY_COMPONENT_ACTION_FACTORY_FORMAT, externalComponent),
                  InvalidActionFactory.class.getName());
-    SentryStoreLayer store = new DelegateSentryStore(confCopy);
+    Activator act = new Activator(confCopy);
+    confCopy.set(ServiceConstants.CURRENT_INCARNATION_ID_KEY,
+                 act.getIncarnationId());
+    Activators.INSTANCE.put(act);
+   SentryStoreLayer store = new DelegateSentryStore(confCopy);
     testGrantPrivilege(store, externalComponent);
+    act.close();
+    Activators.INSTANCE.remove(act);
   }
 
   @Test
@@ -997,8 +1005,14 @@ public class TestPrivilegeOperatePersistence extends SentryStoreIntegrationBase
     Configuration confCopy = new Configuration(conf);
     confCopy.set(String.format(ServiceConstants.ServerConfig.SENTRY_COMPONENT_ACTION_FACTORY_FORMAT, externalComponent),
                  MyComponentActionFactory.class.getName());
+    Activator act = new Activator(confCopy);
+    confCopy.set(ServiceConstants.CURRENT_INCARNATION_ID_KEY,
+                 act.getIncarnationId());
+    Activators.INSTANCE.put(act);
     SentryStoreLayer store = new DelegateSentryStore(confCopy);
     testGrantPrivilege(store, externalComponent);
+    act.close();
+    Activators.INSTANCE.remove(act);
   }
 
   @Test
@@ -1007,8 +1021,14 @@ public class TestPrivilegeOperatePersistence extends SentryStoreIntegrationBase
     Configuration confCopy = new Configuration(conf);
     confCopy.set(String.format(ServiceConstants.ServerConfig.SENTRY_COMPONENT_ACTION_FACTORY_FORMAT, "mycomponent"),
                  MyComponentActionFactory.class.getName());
+    Activator act = new Activator(confCopy);
+    confCopy.set(ServiceConstants.CURRENT_INCARNATION_ID_KEY,
+                 act.getIncarnationId());
+    Activators.INSTANCE.put(act);
     SentryStoreLayer store = new DelegateSentryStore(confCopy);
     testGrantPrivilege(store, externalComponent);
+    act.close();
+    Activators.INSTANCE.remove(act);
   }
 
   private void testGrantPrivilege(SentryStoreLayer sentryStore, String component) throws SentryUserException {

http://git-wip-us.apache.org/repos/asf/sentry/blob/a70cff99/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java
index 3ef1cf7..6e00505 100644
--- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java
+++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java
@@ -27,6 +27,7 @@ import java.util.Set;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.alias.CredentialProvider;
 import org.apache.hadoop.security.alias.CredentialProviderFactory;
 import org.apache.hadoop.security.alias.UserProvider;
@@ -44,6 +45,9 @@ import org.apache.sentry.provider.db.service.thrift.TSentryGroup;
 import org.apache.sentry.provider.db.service.thrift.TSentryPrivilege;
 import org.apache.sentry.provider.db.service.thrift.TSentryRole;
 import org.apache.sentry.provider.file.PolicyFile;
+import org.apache.sentry.service.thrift.Activator;
+import org.apache.sentry.service.thrift.Activators;
+import org.apache.sentry.service.thrift.ServiceConstants;
 import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -67,6 +71,7 @@ public class TestSentryStore extends org.junit.Assert {
   final long NUM_PRIVS = 60;  // > SentryStore.PrivCleaner.NOTIFY_THRESHOLD
   private static Configuration conf = null;
   private static char[] passwd = new char[] { '1', '2', '3'};
+  private static Activator act;
 
   @BeforeClass
   public static void setup() throws Exception {
@@ -89,6 +94,10 @@ public class TestSentryStore extends org.junit.Assert {
     policyFilePath = new File(dataDir, "local_policy_file.ini");
     conf.set(ServerConfig.SENTRY_STORE_GROUP_MAPPING_RESOURCE,
         policyFilePath.getPath());
+    act = new Activator(conf);
+    conf.set(ServiceConstants.CURRENT_INCARNATION_ID_KEY,
+             act.getIncarnationId());
+    Activators.INSTANCE.put(act);
     sentryStore = new SentryStore(conf);
   }
 
@@ -107,12 +116,17 @@ public class TestSentryStore extends org.junit.Assert {
 
   @AfterClass
   public static void teardown() {
+    IOUtils.cleanup(null, act);
     if (sentryStore != null) {
       sentryStore.stop();
     }
     if (dataDir != null) {
       FileUtils.deleteQuietly(dataDir);
     }
+    if (act != null) {
+      Activators.INSTANCE.remove(act);
+      act = null;
+    }
   }
 
   @Test


Mime
View raw message