activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [3/4] activemq-artemis git commit: ARTEMIS-1447 JDBC NodeManager to support JDBC HA Shared Store
Date Thu, 26 Oct 2017 19:41:20 GMT
ARTEMIS-1447 JDBC NodeManager to support JDBC HA Shared Store


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/09a5d6f1
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/09a5d6f1
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/09a5d6f1

Branch: refs/heads/master
Commit: 09a5d6f1c6ba116fb9268ca26c090b648e6a3370
Parents: 9092221
Author: Francesco Nigro <nigro.fra@gmail.com>
Authored: Sat Sep 9 18:41:30 2017 +0200
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Thu Oct 26 15:38:37 2017 -0400

----------------------------------------------------------------------
 .../config/ActiveMQDefaultConfiguration.java    |  27 +-
 .../jdbc/store/sql/GenericSQLProvider.java      | 182 ++++++++-
 .../artemis/jdbc/store/sql/SQLProvider.java     |  39 +-
 artemis-server/pom.xml                          |   7 +-
 .../storage/DatabaseStorageConfiguration.java   |  40 ++
 .../deployers/impl/FileConfigurationParser.java |   7 +-
 .../artemis/core/server/NodeManager.java        |   2 +-
 .../core/server/impl/ActiveMQServerImpl.java    |  13 +-
 .../impl/jdbc/ActiveMQScheduledLeaseLock.java   | 115 ++++++
 .../core/server/impl/jdbc/JdbcLeaseLock.java    | 277 ++++++++++++++
 .../core/server/impl/jdbc/JdbcNodeManager.java  | 380 +++++++++++++++++++
 .../impl/jdbc/JdbcSharedStateManager.java       | 302 +++++++++++++++
 .../core/server/impl/jdbc/LeaseLock.java        | 151 ++++++++
 .../server/impl/jdbc/ScheduledLeaseLock.java    |  44 +++
 .../server/impl/jdbc/SharedStateManager.java    |  60 +++
 .../resources/schema/artemis-configuration.xsd  |  21 +
 .../server/impl/jdbc/JdbcLeaseLockTest.java     | 231 +++++++++++
 docs/user-manual/en/persistence.md              |  15 +
 18 files changed, 1896 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/09a5d6f1/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
index 36891c6..a409ffb 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
@@ -19,9 +19,9 @@ package org.apache.activemq.artemis.api.config;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.ArtemisConstants;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
-import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy;
 
 /**
@@ -438,8 +438,17 @@ public final class ActiveMQDefaultConfiguration {
    // Default large messages table name, used with Database storage type
    private static final String DEFAULT_PAGE_STORE_TABLE_NAME = "PAGE_STORE";
 
+   // Default node manager store table name, used with Database storage type
+   private static final String DEFAULT_NODE_MANAGER_STORE_TABLE_NAME = "NODE_MANAGER_STORE";
+
    private static final int DEFAULT_JDBC_NETWORK_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(20);
 
+   private static final long DEFAULT_JDBC_LOCK_RENEW_PERIOD_MILLIS = TimeUnit.SECONDS.toMillis(4);
+
+   private static final long DEFAULT_JDBC_LOCK_EXPIRATION_MILLIS = TimeUnit.SECONDS.toMillis(20);
+
+   private static final long DEFAULT_JDBC_LOCK_ACQUISITION_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(60);
+
    // Default period to wait between connection TTL checks
    public static final long DEFAULT_CONNECTION_TTL_CHECK_INTERVAL = 2000;
 
@@ -1211,10 +1220,26 @@ public final class ActiveMQDefaultConfiguration {
       return DEFAULT_PAGE_STORE_TABLE_NAME;
    }
 
+   public static String getDefaultNodeManagerStoreTableName() {
+      return DEFAULT_NODE_MANAGER_STORE_TABLE_NAME;
+   }
+
    public static int getDefaultJdbcNetworkTimeout() {
       return DEFAULT_JDBC_NETWORK_TIMEOUT;
    }
 
+   public static long getDefaultJdbcLockRenewPeriodMillis() {
+      return DEFAULT_JDBC_LOCK_RENEW_PERIOD_MILLIS;
+   }
+
+   public static long getDefaultJdbcLockExpirationMillis() {
+      return DEFAULT_JDBC_LOCK_EXPIRATION_MILLIS;
+   }
+
+   public static long getDefaultJdbcLockAcquisitionTimeoutMillis() {
+      return DEFAULT_JDBC_LOCK_ACQUISITION_TIMEOUT_MILLIS;
+   }
+
    public static long getDefaultConnectionTtlCheckInterval() {
       return DEFAULT_CONNECTION_TTL_CHECK_INTERVAL;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/09a5d6f1/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/GenericSQLProvider.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/GenericSQLProvider.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/GenericSQLProvider.java
index 9232001..ac793d3 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/GenericSQLProvider.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/GenericSQLProvider.java
@@ -18,6 +18,14 @@ package org.apache.activemq.artemis.jdbc.store.sql;
 
 public class GenericSQLProvider implements SQLProvider {
 
+   /**
+    * The JDBC Node Manager shared state is contained in these 4 rows: each one is used exclusively for a specific purpose.
+    */
+   private static final int STATE_ROW_ID = 0;
+   private static final int LIVE_LOCK_ROW_ID = 1;
+   private static final int BACKUP_LOCK_ROW_ID = 2;
+   private static final int NODE_ID_ROW_ID = 3;
+
    // Default to lowest (MYSQL = 64k)
    private static final long MAX_BLOB_SIZE = 64512;
 
@@ -57,6 +65,42 @@ public class GenericSQLProvider implements SQLProvider {
 
    private final String countJournalRecordsSQL;
 
+   private final String createNodeManagerStoreTableSQL;
+
+   private final String createStateSQL;
+
+   private final String createNodeIdSQL;
+
+   private final String createLiveLockSQL;
+
+   private final String createBackupLockSQL;
+
+   private final String tryAcquireLiveLockSQL;
+
+   private final String tryAcquireBackupLockSQL;
+
+   private final String tryReleaseLiveLockSQL;
+
+   private final String tryReleaseBackupLockSQL;
+
+   private final String isLiveLockedSQL;
+
+   private final String isBackupLockedSQL;
+
+   private final String renewLiveLockSQL;
+
+   private final String renewBackupLockSQL;
+
+   private final String currentTimestampSQL;
+
+   private final String writeStateSQL;
+
+   private final String readStateSQL;
+
+   private final String writeNodeIdSQL;
+
+   private final String readNodeIdSQL;
+
    protected final DatabaseStoreType databaseStoreType;
 
    protected GenericSQLProvider(String tableName, DatabaseStoreType databaseStoreType) {
@@ -64,8 +108,7 @@ public class GenericSQLProvider implements SQLProvider {
 
       this.databaseStoreType = databaseStoreType;
 
-      createFileTableSQL = "CREATE TABLE " + tableName +
-         "(ID BIGINT AUTO_INCREMENT, FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA BLOB, PRIMARY KEY(ID))";
+      createFileTableSQL = "CREATE TABLE " + tableName + "(ID BIGINT AUTO_INCREMENT, FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA BLOB, PRIMARY KEY(ID))";
 
       insertFileSQL = "INSERT INTO " + tableName + " (FILENAME, EXTENSION, DATA) VALUES (?,?,?)";
 
@@ -81,17 +124,13 @@ public class GenericSQLProvider implements SQLProvider {
 
       updateFileNameByIdSQL = "UPDATE " + tableName + " SET FILENAME=? WHERE ID=?";
 
-      cloneFileRecordSQL = "INSERT INTO " + tableName + "(FILENAME, EXTENSION, DATA) " +
-         "(SELECT FILENAME, EXTENSION, DATA FROM " + tableName + " WHERE ID=?)";
+      cloneFileRecordSQL = "INSERT INTO " + tableName + "(FILENAME, EXTENSION, DATA) " + "(SELECT FILENAME, EXTENSION, DATA FROM " + tableName + " WHERE ID=?)";
 
       copyFileRecordByIdSQL = "UPDATE " + tableName + " SET DATA = (SELECT DATA FROM " + tableName + " WHERE ID=?) WHERE ID=?";
 
       dropFileTableSQL = "DROP TABLE " + tableName;
 
-      createJournalTableSQL = new String[] {
-         "CREATE TABLE " + tableName + "(id BIGINT,recordType SMALLINT,compactCount SMALLINT,txId BIGINT,userRecordType SMALLINT,variableSize INTEGER,record BLOB,txDataSize INTEGER,txData BLOB,txCheckNoRecords INTEGER,seq BIGINT NOT NULL, PRIMARY KEY(seq))",
-         "CREATE INDEX " + tableName + "_IDX ON " + tableName + " (id)"
-      };
+      createJournalTableSQL = new String[]{"CREATE TABLE " + tableName + "(id BIGINT,recordType SMALLINT,compactCount SMALLINT,txId BIGINT,userRecordType SMALLINT,variableSize INTEGER,record BLOB,txDataSize INTEGER,txData BLOB,txCheckNoRecords INTEGER,seq BIGINT NOT NULL, PRIMARY KEY(seq))", "CREATE INDEX " + tableName + "_IDX ON " + tableName + " (id)"};
 
       insertJournalRecordsSQL = "INSERT INTO " + tableName + "(id,recordType,compactCount,txId,userRecordType,variableSize,record,txDataSize,txData,txCheckNoRecords,seq) " + "VALUES (?,?,?,?,?,?,?,?,?,?,?)";
 
@@ -102,6 +141,43 @@ public class GenericSQLProvider implements SQLProvider {
       deleteJournalTxRecordsSQL = "DELETE FROM " + tableName + " WHERE txId=?";
 
       countJournalRecordsSQL = "SELECT COUNT(*) FROM " + tableName;
+
+      createNodeManagerStoreTableSQL = "CREATE TABLE " + tableName + " ( ID INT NOT NULL, HOLDER_ID VARCHAR(128), HOLDER_EXPIRATION_TIME TIMESTAMP, NODE_ID CHAR(36),STATE CHAR(1), PRIMARY KEY(ID))";
+
+      createStateSQL = "INSERT INTO " + tableName + " (ID) VALUES (" + STATE_ROW_ID + ")";
+
+      createNodeIdSQL = "INSERT INTO " + tableName + " (ID) VALUES (" + NODE_ID_ROW_ID + ")";
+
+      createLiveLockSQL = "INSERT INTO " + tableName + " (ID) VALUES (" + LIVE_LOCK_ROW_ID + ")";
+
+      createBackupLockSQL = "INSERT INTO " + tableName + " (ID) VALUES (" + BACKUP_LOCK_ROW_ID + ")";
+
+      tryAcquireLiveLockSQL = "UPDATE " + tableName + " SET HOLDER_ID = ?, HOLDER_EXPIRATION_TIME = ? WHERE (HOLDER_EXPIRATION_TIME IS NULL OR HOLDER_EXPIRATION_TIME < CURRENT_TIMESTAMP) AND ID = " + LIVE_LOCK_ROW_ID;
+
+      tryAcquireBackupLockSQL = "UPDATE " + tableName + " SET HOLDER_ID = ?, HOLDER_EXPIRATION_TIME = ? WHERE (HOLDER_EXPIRATION_TIME IS NULL OR HOLDER_EXPIRATION_TIME < CURRENT_TIMESTAMP) AND ID = " + BACKUP_LOCK_ROW_ID;
+
+      tryReleaseLiveLockSQL = "UPDATE " + tableName + " SET HOLDER_ID = NULL, HOLDER_EXPIRATION_TIME = NULL WHERE HOLDER_ID = ? AND ID = " + LIVE_LOCK_ROW_ID;
+
+      tryReleaseBackupLockSQL = "UPDATE " + tableName + " SET HOLDER_ID = NULL, HOLDER_EXPIRATION_TIME = NULL WHERE HOLDER_ID = ? AND ID = " + BACKUP_LOCK_ROW_ID;
+
+      isLiveLockedSQL = "SELECT HOLDER_ID, HOLDER_EXPIRATION_TIME FROM " + tableName + " WHERE ID = " + LIVE_LOCK_ROW_ID;
+
+      isBackupLockedSQL = "SELECT HOLDER_ID, HOLDER_EXPIRATION_TIME FROM " + tableName + " WHERE ID = " + BACKUP_LOCK_ROW_ID;
+
+      renewLiveLockSQL = "UPDATE " + tableName + " SET HOLDER_EXPIRATION_TIME = ? WHERE HOLDER_ID = ? AND ID = " + LIVE_LOCK_ROW_ID;
+
+      renewBackupLockSQL = "UPDATE " + tableName + " SET HOLDER_EXPIRATION_TIME = ? WHERE HOLDER_ID = ? AND ID = " + BACKUP_LOCK_ROW_ID;
+
+      currentTimestampSQL = "SELECT CURRENT_TIMESTAMP FROM " + tableName;
+
+      writeStateSQL = "UPDATE " + tableName + " SET STATE = ? WHERE ID = " + STATE_ROW_ID;
+
+      readStateSQL = "SELECT STATE FROM " + tableName + " WHERE ID = " + STATE_ROW_ID;
+
+      writeNodeIdSQL = "UPDATE " + tableName + " SET NODE_ID = ? WHERE ID = " + NODE_ID_ROW_ID;
+
+      readNodeIdSQL = "SELECT NODE_ID FROM " + tableName + " WHERE ID = " + NODE_ID_ROW_ID;
+
    }
 
    @Override
@@ -202,6 +278,96 @@ public class GenericSQLProvider implements SQLProvider {
    }
 
    @Override
+   public String createNodeManagerStoreTableSQL() {
+      return createNodeManagerStoreTableSQL;
+   }
+
+   @Override
+   public String createStateSQL() {
+      return createStateSQL;
+   }
+
+   @Override
+   public String createNodeIdSQL() {
+      return createNodeIdSQL;
+   }
+
+   @Override
+   public String createLiveLockSQL() {
+      return createLiveLockSQL;
+   }
+
+   @Override
+   public String createBackupLockSQL() {
+      return createBackupLockSQL;
+   }
+
+   @Override
+   public String tryAcquireLiveLockSQL() {
+      return tryAcquireLiveLockSQL;
+   }
+
+   @Override
+   public String tryAcquireBackupLockSQL() {
+      return tryAcquireBackupLockSQL;
+   }
+
+   @Override
+   public String tryReleaseLiveLockSQL() {
+      return tryReleaseLiveLockSQL;
+   }
+
+   @Override
+   public String tryReleaseBackupLockSQL() {
+      return tryReleaseBackupLockSQL;
+   }
+
+   @Override
+   public String isLiveLockedSQL() {
+      return isLiveLockedSQL;
+   }
+
+   @Override
+   public String isBackupLockedSQL() {
+      return isBackupLockedSQL;
+   }
+
+   @Override
+   public String renewLiveLockSQL() {
+      return renewLiveLockSQL;
+   }
+
+   @Override
+   public String renewBackupLockSQL() {
+      return renewBackupLockSQL;
+   }
+
+   @Override
+   public String currentTimestampSQL() {
+      return currentTimestampSQL;
+   }
+
+   @Override
+   public String writeStateSQL() {
+      return writeStateSQL;
+   }
+
+   @Override
+   public String readStateSQL() {
+      return readStateSQL;
+   }
+
+   @Override
+   public String writeNodeIdSQL() {
+      return writeNodeIdSQL;
+   }
+
+   @Override
+   public String readNodeIdSQL() {
+      return readNodeIdSQL;
+   }
+
+   @Override
    public boolean closeConnectionOnShutdown() {
       return true;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/09a5d6f1/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/SQLProvider.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/SQLProvider.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/SQLProvider.java
index 1663179..b4b55d5 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/SQLProvider.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/SQLProvider.java
@@ -19,7 +19,7 @@ package org.apache.activemq.artemis.jdbc.store.sql;
 public interface SQLProvider {
 
    enum DatabaseStoreType {
-      PAGE, MESSAGE_JOURNAL, BINDINGS_JOURNAL, LARGE_MESSAGE
+      PAGE, MESSAGE_JOURNAL, BINDINGS_JOURNAL, LARGE_MESSAGE, NODE_MANAGER
    }
 
    long getMaxBlobSize();
@@ -62,7 +62,44 @@ public interface SQLProvider {
 
    boolean closeConnectionOnShutdown();
 
+   String createNodeManagerStoreTableSQL();
+
+   String createStateSQL();
+
+   String createNodeIdSQL();
+
+   String createLiveLockSQL();
+
+   String createBackupLockSQL();
+
+   String tryAcquireLiveLockSQL();
+
+   String tryAcquireBackupLockSQL();
+
+   String tryReleaseLiveLockSQL();
+
+   String tryReleaseBackupLockSQL();
+
+   String isLiveLockedSQL();
+
+   String isBackupLockedSQL();
+
+   String renewLiveLockSQL();
+
+   String renewBackupLockSQL();
+
+   String currentTimestampSQL();
+
+   String writeStateSQL();
+
+   String readStateSQL();
+
+   String writeNodeIdSQL();
+
+   String readNodeIdSQL();
+
    interface Factory {
+
       SQLProvider create(String tableName, DatabaseStoreType dbStoreType);
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/09a5d6f1/artemis-server/pom.xml
----------------------------------------------------------------------
diff --git a/artemis-server/pom.xml b/artemis-server/pom.xml
index 1aec5b7..3379e88 100644
--- a/artemis-server/pom.xml
+++ b/artemis-server/pom.xml
@@ -117,7 +117,12 @@
          <scope>test</scope>
          <type>test-jar</type>
       </dependency>
-
+      <!-- db test -->
+      <dependency>
+         <groupId>org.apache.derby</groupId>
+         <artifactId>derby</artifactId>
+         <scope>test</scope>
+      </dependency>
    </dependencies>
 
    <profiles>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/09a5d6f1/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java
index 76626c0..b2982a1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java
@@ -32,6 +32,8 @@ public class DatabaseStorageConfiguration implements StoreConfiguration {
 
    private String pageStoreTableName = ActiveMQDefaultConfiguration.getDefaultPageStoreTableName();
 
+   private String nodeManagerStoreTableName = ActiveMQDefaultConfiguration.getDefaultNodeManagerStoreTableName();
+
    private String jdbcConnectionUrl = ActiveMQDefaultConfiguration.getDefaultDatabaseUrl();
 
    private String jdbcDriverClassName = ActiveMQDefaultConfiguration.getDefaultDriverClassName();
@@ -42,6 +44,12 @@ public class DatabaseStorageConfiguration implements StoreConfiguration {
 
    private int jdbcNetworkTimeout = ActiveMQDefaultConfiguration.getDefaultJdbcNetworkTimeout();
 
+   private long jdbcLockRenewPeriodMillis = ActiveMQDefaultConfiguration.getDefaultJdbcLockRenewPeriodMillis();
+
+   private long jdbcLockExpirationMillis = ActiveMQDefaultConfiguration.getDefaultJdbcLockExpirationMillis();
+
+   private long jdbcLockAcquisitionTimeoutMillis = ActiveMQDefaultConfiguration.getDefaultJdbcLockAcquisitionTimeoutMillis();
+
    @Override
    public StoreType getStoreType() {
       return StoreType.DATABASE;
@@ -75,6 +83,14 @@ public class DatabaseStorageConfiguration implements StoreConfiguration {
       return pageStoreTableName;
    }
 
+   public void setNodeManagerStoreTableName(String nodeManagerStoreTableName) {
+      this.nodeManagerStoreTableName = nodeManagerStoreTableName;
+   }
+
+   public String getNodeManagerStoreTableName() {
+      return nodeManagerStoreTableName;
+   }
+
    public void setPageStoreTableName(String pageStoreTableName) {
       this.pageStoreTableName = pageStoreTableName;
    }
@@ -135,4 +151,28 @@ public class DatabaseStorageConfiguration implements StoreConfiguration {
    public void setJdbcNetworkTimeout(int jdbcNetworkTimeout) {
       this.jdbcNetworkTimeout = jdbcNetworkTimeout;
    }
+
+   public long getJdbcLockRenewPeriodMillis() {
+      return jdbcLockRenewPeriodMillis;
+   }
+
+   public void setJdbcLockRenewPeriodMillis(long jdbcLockRenewPeriodMillis) {
+      this.jdbcLockRenewPeriodMillis = jdbcLockRenewPeriodMillis;
+   }
+
+   public long getJdbcLockExpirationMillis() {
+      return jdbcLockExpirationMillis;
+   }
+
+   public void setJdbcLockExpirationMillis(long jdbcLockExpirationMillis) {
+      this.jdbcLockExpirationMillis = jdbcLockExpirationMillis;
+   }
+
+   public long getJdbcLockAcquisitionTimeoutMillis() {
+      return jdbcLockAcquisitionTimeoutMillis;
+   }
+
+   public void setJdbcLockAcquisitionTimeoutMillis(long jdbcLockAcquisitionTimeoutMillis) {
+      this.jdbcLockAcquisitionTimeoutMillis = jdbcLockAcquisitionTimeoutMillis;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/09a5d6f1/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index bf92840..e8eee49 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -31,8 +31,6 @@ import java.util.Set;
 
 import org.apache.activemq.artemis.ArtemisConstants;
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
-import org.apache.activemq.artemis.core.config.TransformerConfiguration;
-import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy;
 import org.apache.activemq.artemis.api.core.BroadcastEndpointFactory;
 import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration;
 import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
@@ -52,6 +50,7 @@ import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
 import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
 import org.apache.activemq.artemis.core.config.DivertConfiguration;
 import org.apache.activemq.artemis.core.config.ScaleDownConfiguration;
+import org.apache.activemq.artemis.core.config.TransformerConfiguration;
 import org.apache.activemq.artemis.core.config.WildcardConfiguration;
 import org.apache.activemq.artemis.core.config.ha.ColocatedPolicyConfiguration;
 import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
@@ -84,6 +83,7 @@ import org.apache.activemq.artemis.utils.PasswordMaskingUtil;
 import org.apache.activemq.artemis.utils.SensitiveDataCodec;
 import org.apache.activemq.artemis.utils.XMLConfigurationUtil;
 import org.apache.activemq.artemis.utils.XMLUtil;
+import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy;
 import org.w3c.dom.Element;
 import org.w3c.dom.NamedNodeMap;
 import org.w3c.dom.Node;
@@ -1422,6 +1422,9 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
       conf.setJdbcConnectionUrl(getString(storeNode, "jdbc-connection-url", conf.getJdbcConnectionUrl(), Validators.NO_CHECK));
       conf.setJdbcDriverClassName(getString(storeNode, "jdbc-driver-class-name", conf.getJdbcDriverClassName(), Validators.NO_CHECK));
       conf.setJdbcNetworkTimeout(getInteger(storeNode, "jdbc-network-timeout", conf.getJdbcNetworkTimeout(), Validators.NO_CHECK));
+      conf.setJdbcLockAcquisitionTimeoutMillis(getLong(storeNode, "jdbc-lock-acquisition-timeout", conf.getJdbcLockAcquisitionTimeoutMillis(), Validators.NO_CHECK));
+      conf.setJdbcLockRenewPeriodMillis(getLong(storeNode, "jdbc-lock-renew-period", conf.getJdbcLockRenewPeriodMillis(), Validators.NO_CHECK));
+      conf.setJdbcLockExpirationMillis(getLong(storeNode, "jdbc-lock-expiration", conf.getJdbcLockExpirationMillis(), Validators.NO_CHECK));
       return conf;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/09a5d6f1/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/NodeManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/NodeManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/NodeManager.java
index 28f05b2..e963b22 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/NodeManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/NodeManager.java
@@ -127,7 +127,7 @@ public abstract class NodeManager implements ActiveMQComponent {
       isStarted = false;
    }
 
-   public final void stopBackup() throws Exception {
+   public void stopBackup() throws Exception {
       if (replicatedBackup && getNodeId() != null) {
          setUpServerLockFile();
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/09a5d6f1/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 018e370..7b4adb4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -49,10 +49,9 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
-import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
-import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy;
 import org.apache.activemq.artemis.api.core.ActiveMQDeleteAddressException;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
 import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
@@ -136,7 +135,6 @@ import org.apache.activemq.artemis.core.server.ServiceComponent;
 import org.apache.activemq.artemis.core.server.ServiceRegistry;
 import org.apache.activemq.artemis.core.server.cluster.BackupManager;
 import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
-import org.apache.activemq.artemis.core.server.transformer.Transformer;
 import org.apache.activemq.artemis.core.server.cluster.ha.HAPolicy;
 import org.apache.activemq.artemis.core.server.files.FileMoveManager;
 import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
@@ -144,6 +142,7 @@ import org.apache.activemq.artemis.core.server.group.GroupingHandler;
 import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration;
 import org.apache.activemq.artemis.core.server.group.impl.LocalGroupingHandler;
 import org.apache.activemq.artemis.core.server.group.impl.RemoteGroupingHandler;
+import org.apache.activemq.artemis.core.server.impl.jdbc.JdbcNodeManager;
 import org.apache.activemq.artemis.core.server.management.ManagementService;
 import org.apache.activemq.artemis.core.server.management.impl.ManagementServiceImpl;
 import org.apache.activemq.artemis.core.server.plugin.ActiveMQPluginRunnable;
@@ -151,6 +150,7 @@ import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
 import org.apache.activemq.artemis.core.server.reload.ReloadCallback;
 import org.apache.activemq.artemis.core.server.reload.ReloadManager;
 import org.apache.activemq.artemis.core.server.reload.ReloadManagerImpl;
+import org.apache.activemq.artemis.core.server.transformer.Transformer;
 import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.settings.impl.DeletionPolicy;
@@ -176,6 +176,7 @@ import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
 import org.apache.activemq.artemis.utils.critical.CriticalAction;
 import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
 import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerImpl;
+import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy;
 import org.apache.activemq.artemis.utils.critical.CriticalComponent;
 import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer;
 import org.jboss.logging.Logger;
@@ -456,6 +457,12 @@ public class ActiveMQServerImpl implements ActiveMQServer {
       NodeManager manager;
       if (!configuration.isPersistenceEnabled()) {
          manager = new InVMNodeManager(replicatingBackup);
+      } else if (configuration.getStoreConfiguration() != null && configuration.getStoreConfiguration().getStoreType() == StoreConfiguration.StoreType.DATABASE) {
+         if (replicatingBackup) {
+            throw new IllegalArgumentException("replicatingBackup is not supported yet while using JDBC persistence");
+         }
+         final DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration) configuration.getStoreConfiguration();
+         manager = JdbcNodeManager.with(dbConf, scheduledPool, executorFactory, shutdownOnCriticalIO);
       } else {
          manager = new FileLockNodeManager(directory, replicatingBackup, configuration.getJournalLockAcquisitionTimeout());
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/09a5d6f1/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ActiveMQScheduledLeaseLock.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ActiveMQScheduledLeaseLock.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ActiveMQScheduledLeaseLock.java
new file mode 100644
index 0000000..30db629
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ActiveMQScheduledLeaseLock.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server.impl.jdbc;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
+import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
+import org.jboss.logging.Logger;
+
+/**
+ * Default implementation of a {@link ScheduledLeaseLock}: see {@link ScheduledLeaseLock#of(ScheduledExecutorService, ArtemisExecutor, String, LeaseLock, long, IOCriticalErrorListener)}.
+ */
+final class ActiveMQScheduledLeaseLock extends ActiveMQScheduledComponent implements ScheduledLeaseLock {
+
+   private static final Logger LOGGER = Logger.getLogger(ActiveMQScheduledLeaseLock.class);
+
+   private final String lockName;
+   private final LeaseLock lock;
+   private long lastLockRenewStart;
+   private final long renewPeriodMillis;
+   private final IOCriticalErrorListener ioCriticalErrorListener;
+
+   ActiveMQScheduledLeaseLock(ScheduledExecutorService scheduledExecutorService,
+                              ArtemisExecutor executor,
+                              String lockName,
+                              LeaseLock lock,
+                              long renewPeriodMillis,
+                              IOCriticalErrorListener ioCriticalErrorListener) {
+      super(scheduledExecutorService, executor, 0, renewPeriodMillis, TimeUnit.MILLISECONDS, false);
+      if (renewPeriodMillis >= lock.expirationMillis()) {
+         throw new IllegalArgumentException("renewPeriodMillis must be < lock's expirationMillis");
+      }
+      this.lockName = lockName;
+      this.lock = lock;
+      this.renewPeriodMillis = renewPeriodMillis;
+      //already expired start time
+      this.lastLockRenewStart = System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(lock.expirationMillis());
+      this.ioCriticalErrorListener = ioCriticalErrorListener;
+   }
+
+   @Override
+   public long renewPeriodMillis() {
+      return renewPeriodMillis;
+   }
+
+   @Override
+   public LeaseLock lock() {
+      return lock;
+   }
+
+   @Override
+   public synchronized void start() {
+      if (isStarted()) {
+         return;
+      }
+      this.lastLockRenewStart = System.nanoTime();
+      super.start();
+   }
+
+   @Override
+   public synchronized void stop() {
+      if (!isStarted()) {
+         return;
+      }
+      super.stop();
+   }
+
+   @Override
+   public void run() {
+      final long lastRenewStart = this.lastLockRenewStart;
+      final long renewStart = System.nanoTime();
+      if (!this.lock.renew()) {
+         ioCriticalErrorListener.onIOException(new IllegalStateException(lockName + " lock can't be renewed"), "Critical error while on " + lockName + " renew", null);
+      }
+      //logic to detect slowness of DB and/or the scheduled executor service
+      detectAndReportRenewSlowness(lockName, lastRenewStart, renewStart, renewPeriodMillis, lock.expirationMillis());
+      this.lastLockRenewStart = lastRenewStart;
+   }
+
+   private static void detectAndReportRenewSlowness(String lockName,
+                                                    long lastRenewStart,
+                                                    long renewStart,
+                                                    long expectedRenewPeriodMillis,
+                                                    long expirationMillis) {
+      final long elapsedMillisToRenew = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - renewStart);
+      if (elapsedMillisToRenew > expectedRenewPeriodMillis) {
+         LOGGER.error(lockName + " lock renew tooks " + elapsedMillisToRenew + " ms, while is supposed to take <" + expectedRenewPeriodMillis + " ms");
+      }
+      final long measuredRenewPeriodNanos = renewStart - lastRenewStart;
+      final long measuredRenewPeriodMillis = TimeUnit.NANOSECONDS.toMillis(measuredRenewPeriodNanos);
+      if (measuredRenewPeriodMillis > expirationMillis) {
+         LOGGER.error(lockName + " lock renew period lasted " + measuredRenewPeriodMillis + " ms instead of " + expectedRenewPeriodMillis + " ms");
+      } else if (measuredRenewPeriodMillis > expectedRenewPeriodMillis) {
+         LOGGER.warn(lockName + " lock renew period lasted " + measuredRenewPeriodMillis + " ms instead of " + expectedRenewPeriodMillis + " ms");
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/09a5d6f1/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLock.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLock.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLock.java
new file mode 100644
index 0000000..0656235
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLock.java
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server.impl.jdbc;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.Objects;
+import java.util.function.Predicate;
+
+import org.jboss.logging.Logger;
+
+/**
+ * JDBC implementation of a {@link LeaseLock} with a {@code String} defined {@link #holderId()}.
+ */
+final class JdbcLeaseLock implements LeaseLock {
+
+   private static final Logger LOGGER = Logger.getLogger(JdbcLeaseLock.class);
+   private static final int MAX_HOLDER_ID_LENGTH = 128;
+   private final Connection connection;
+   private final long maxAllowableMillisDiffFromDBTime;
+   private long millisDiffFromCurrentTime;
+   private final String holderId;
+   private final PreparedStatement tryAcquireLock;
+   private final PreparedStatement tryReleaseLock;
+   private final PreparedStatement renewLock;
+   private final PreparedStatement isLocked;
+   private final PreparedStatement currentDateTime;
+   private final long expirationMillis;
+   private boolean maybeAcquired;
+
+   /**
+    * The lock will be responsible (ie {@link #close()}) of all the {@link PreparedStatement}s used by it, but not of the {@link Connection},
+    * whose life cycle will be managed externally.
+    */
+   JdbcLeaseLock(String holderId,
+                 Connection connection,
+                 PreparedStatement tryAcquireLock,
+                 PreparedStatement tryReleaseLock,
+                 PreparedStatement renewLock,
+                 PreparedStatement isLocked,
+                 PreparedStatement currentDateTime,
+                 long expirationMIllis,
+                 long maxAllowableMillisDiffFromDBTime) {
+      if (holderId.length() > MAX_HOLDER_ID_LENGTH) {
+         throw new IllegalArgumentException("holderId length must be <=" + MAX_HOLDER_ID_LENGTH);
+      }
+      this.holderId = holderId;
+      this.maxAllowableMillisDiffFromDBTime = maxAllowableMillisDiffFromDBTime;
+      this.millisDiffFromCurrentTime = Long.MAX_VALUE;
+      this.tryAcquireLock = tryAcquireLock;
+      this.tryReleaseLock = tryReleaseLock;
+      this.renewLock = renewLock;
+      this.isLocked = isLocked;
+      this.currentDateTime = currentDateTime;
+      this.expirationMillis = expirationMIllis;
+      this.maybeAcquired = false;
+      this.connection = connection;
+   }
+
+   public String holderId() {
+      return holderId;
+   }
+
+   @Override
+   public long expirationMillis() {
+      return expirationMillis;
+   }
+
+   private long timeDifference() throws SQLException {
+      if (Long.MAX_VALUE == millisDiffFromCurrentTime) {
+         if (maxAllowableMillisDiffFromDBTime > 0) {
+            millisDiffFromCurrentTime = determineTimeDifference();
+         } else {
+            millisDiffFromCurrentTime = 0L;
+         }
+      }
+      return millisDiffFromCurrentTime;
+   }
+
+   private long determineTimeDifference() throws SQLException {
+      try (ResultSet resultSet = currentDateTime.executeQuery()) {
+         long result = 0L;
+         if (resultSet.next()) {
+            final Timestamp timestamp = resultSet.getTimestamp(1);
+            final long diff = System.currentTimeMillis() - timestamp.getTime();
+            if (Math.abs(diff) > maxAllowableMillisDiffFromDBTime) {
+               // off by more than maxAllowableMillisDiffFromDBTime so lets adjust
+               result = (-diff);
+            }
+            LOGGER.info(holderId() + " diff adjust from db: " + result + ", db time: " + timestamp);
+         }
+         return result;
+      }
+   }
+
+   @Override
+   public boolean renew() {
+      synchronized (connection) {
+         try {
+            final boolean result;
+            connection.setAutoCommit(false);
+            try {
+               final long timeDifference = timeDifference();
+               final PreparedStatement preparedStatement = this.renewLock;
+               final long now = System.currentTimeMillis() + timeDifference;
+               final Timestamp timestamp = new Timestamp(now + expirationMillis);
+               preparedStatement.setTimestamp(1, timestamp);
+               preparedStatement.setString(2, holderId);
+               result = preparedStatement.executeUpdate() == 1;
+            } catch (SQLException ie) {
+               connection.rollback();
+               connection.setAutoCommit(true);
+               throw new IllegalStateException(ie);
+            }
+            connection.commit();
+            connection.setAutoCommit(true);
+            return result;
+         } catch (SQLException e) {
+            throw new IllegalStateException(e);
+         }
+      }
+   }
+
+   @Override
+   public boolean tryAcquire() {
+      synchronized (connection) {
+         try {
+            final boolean acquired;
+            connection.setAutoCommit(false);
+            try {
+               final long timeDifference = timeDifference();
+               final PreparedStatement preparedStatement = tryAcquireLock;
+               final long now = System.currentTimeMillis() + timeDifference;
+               preparedStatement.setString(1, holderId);
+               final Timestamp timestamp = new Timestamp(now + expirationMillis);
+               preparedStatement.setTimestamp(2, timestamp);
+               acquired = preparedStatement.executeUpdate() == 1;
+            } catch (SQLException ie) {
+               connection.rollback();
+               connection.setAutoCommit(true);
+               throw new IllegalStateException(ie);
+            }
+            connection.commit();
+            connection.setAutoCommit(true);
+            if (acquired) {
+               this.maybeAcquired = true;
+            }
+            return acquired;
+         } catch (SQLException e) {
+            throw new IllegalStateException(e);
+         }
+      }
+   }
+
+   @Override
+   public boolean isHeld() {
+      return checkValidHolderId(Objects::nonNull);
+   }
+
+   @Override
+   public boolean isHeldByCaller() {
+      return checkValidHolderId(this.holderId::equals);
+   }
+
+   private boolean checkValidHolderId(Predicate<? super String> holderIdFilter) {
+      synchronized (connection) {
+         try {
+            boolean result;
+            connection.setAutoCommit(false);
+            try {
+               final long timeDifference = timeDifference();
+               final PreparedStatement preparedStatement = this.isLocked;
+               try (ResultSet resultSet = preparedStatement.executeQuery()) {
+                  if (!resultSet.next()) {
+                     result = false;
+                  } else {
+                     final String currentHolderId = resultSet.getString(1);
+                     result = holderIdFilter.test(currentHolderId);
+                     //warn about any zombie lock
+                     final Timestamp timestamp = resultSet.getTimestamp(2);
+                     if (timestamp != null) {
+                        final long lockExpirationTime = timestamp.getTime();
+                        final long now = System.currentTimeMillis() + timeDifference;
+                        final long expiredBy = now - lockExpirationTime;
+                        if (expiredBy > 0) {
+                           result = false;
+                           LOGGER.warn("found zombie lock with holderId: " + currentHolderId + " expired by: " + expiredBy + " ms");
+                        }
+                     }
+                  }
+               }
+            } catch (SQLException ie) {
+               connection.rollback();
+               connection.setAutoCommit(true);
+               throw new IllegalStateException(ie);
+            }
+            connection.commit();
+            connection.setAutoCommit(true);
+            return result;
+         } catch (SQLException e) {
+            throw new IllegalStateException(e);
+         }
+      }
+   }
+
+   @Override
+   public void release() {
+      synchronized (connection) {
+         try {
+            connection.setAutoCommit(false);
+            try {
+               final PreparedStatement preparedStatement = this.tryReleaseLock;
+               preparedStatement.setString(1, holderId);
+               if (preparedStatement.executeUpdate() != 1) {
+                  LOGGER.warn(holderId + " has failed to release a lock");
+               } else {
+                  LOGGER.info(holderId + " has released a lock");
+               }
+               //consider it as released to avoid on finalize to be reclaimed
+               this.maybeAcquired = false;
+            } catch (SQLException ie) {
+               connection.rollback();
+               connection.setAutoCommit(true);
+               throw new IllegalStateException(ie);
+            }
+            connection.commit();
+            connection.setAutoCommit(true);
+         } catch (SQLException e) {
+            throw new IllegalStateException(e);
+         }
+      }
+   }
+
+   @Override
+   public void close() throws SQLException {
+      synchronized (connection) {
+         //to avoid being called if not needed
+         if (!this.tryReleaseLock.isClosed()) {
+            try {
+               if (this.maybeAcquired) {
+                  release();
+               }
+            } finally {
+               this.tryReleaseLock.close();
+               this.tryAcquireLock.close();
+               this.renewLock.close();
+               this.isLocked.close();
+               this.currentDateTime.close();
+            }
+         }
+      }
+   }
+
+   @Override
+   protected void finalize() throws Throwable {
+      close();
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/09a5d6f1/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java
new file mode 100644
index 0000000..f4baeea
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java
@@ -0,0 +1,380 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server.impl.jdbc;
+
+import javax.sql.DataSource;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.server.ActivateCallback;
+import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.NodeManager;
+import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
+import org.apache.activemq.artemis.utils.ExecutorFactory;
+import org.apache.activemq.artemis.utils.UUID;
+import org.apache.activemq.artemis.utils.UUIDGenerator;
+import org.jboss.logging.Logger;
+
+/**
+ * JDBC implementation of {@link NodeManager}.
+ */
+public final class JdbcNodeManager extends NodeManager {
+
+   private static final Logger logger = Logger.getLogger(JdbcNodeManager.class);
+   private static final long MAX_PAUSE_MILLIS = 2000L;
+
+   private final SharedStateManager sharedStateManager;
+   private final ScheduledLeaseLock scheduledLiveLock;
+   private final ScheduledLeaseLock scheduledBackupLock;
+   private final long lockRenewPeriodMillis;
+   private final long lockAcquisitionTimeoutMillis;
+   private volatile boolean interrupted = false;
+   private final LeaseLock.Pauser pauser;
+   private final IOCriticalErrorListener ioCriticalErrorListener;
+
+   public static JdbcNodeManager with(DatabaseStorageConfiguration configuration,
+                                      ScheduledExecutorService scheduledExecutorService,
+                                      ExecutorFactory executorFactory,
+                                      IOCriticalErrorListener ioCriticalErrorListener) {
+      if (configuration.getDataSource() != null) {
+         final String brokerId = java.util.UUID.randomUUID().toString();
+         return usingDataSource(brokerId, configuration.getJdbcLockExpirationMillis(), configuration.getJdbcLockRenewPeriodMillis(), configuration.getJdbcLockAcquisitionTimeoutMillis(), configuration.getDataSource(), configuration.getSqlProviderFactory().create(configuration.getNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER), scheduledExecutorService, executorFactory, ioCriticalErrorListener);
+      } else {
+         final String brokerId = java.util.UUID.randomUUID().toString();
+         return usingConnectionUrl(brokerId, configuration.getJdbcLockExpirationMillis(), configuration.getJdbcLockRenewPeriodMillis(), configuration.getJdbcLockAcquisitionTimeoutMillis(), configuration.getJdbcConnectionUrl(), configuration.getJdbcDriverClassName(), configuration.getSqlProviderFactory().create(configuration.getNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER), scheduledExecutorService, executorFactory, ioCriticalErrorListener);
+      }
+   }
+
+   static JdbcNodeManager usingDataSource(String brokerId,
+                                          long lockExpirationMillis,
+                                          long lockRenewPeriodMillis,
+                                          long lockAcquisitionTimeoutMillis,
+                                          DataSource dataSource,
+                                          SQLProvider provider,
+                                          ScheduledExecutorService scheduledExecutorService,
+                                          ExecutorFactory executorFactory,
+                                          IOCriticalErrorListener ioCriticalErrorListener) {
+      return new JdbcNodeManager(JdbcSharedStateManager.usingDataSource(brokerId, lockExpirationMillis, dataSource, provider), false, lockRenewPeriodMillis, lockAcquisitionTimeoutMillis, scheduledExecutorService, executorFactory, ioCriticalErrorListener);
+   }
+
+   public static JdbcNodeManager usingConnectionUrl(String brokerId,
+                                                    long lockExpirationMillis,
+                                                    long lockRenewPeriodMillis,
+                                                    long lockAcquisitionTimeoutMillis,
+                                                    String jdbcUrl,
+                                                    String driverClass,
+                                                    SQLProvider provider,
+                                                    ScheduledExecutorService scheduledExecutorService,
+                                                    ExecutorFactory executorFactory,
+                                                    IOCriticalErrorListener ioCriticalErrorListener) {
+      return new JdbcNodeManager(JdbcSharedStateManager.usingConnectionUrl(brokerId, lockExpirationMillis, jdbcUrl, driverClass, provider), false, lockRenewPeriodMillis, lockAcquisitionTimeoutMillis, scheduledExecutorService, executorFactory, ioCriticalErrorListener);
+   }
+
+   private JdbcNodeManager(final SharedStateManager sharedStateManager,
+                           boolean replicatedBackup,
+                           long lockRenewPeriodMillis,
+                           long lockAcquisitionTimeoutMillis,
+                           ScheduledExecutorService scheduledExecutorService,
+                           ExecutorFactory executorFactory,
+                           IOCriticalErrorListener ioCriticalErrorListener) {
+      super(replicatedBackup, null);
+      this.lockAcquisitionTimeoutMillis = lockAcquisitionTimeoutMillis;
+      this.lockRenewPeriodMillis = lockRenewPeriodMillis;
+      this.pauser = LeaseLock.Pauser.sleep(Math.min(this.lockRenewPeriodMillis, MAX_PAUSE_MILLIS), TimeUnit.MILLISECONDS);
+      this.sharedStateManager = sharedStateManager;
+      this.scheduledLiveLock = ScheduledLeaseLock.of(scheduledExecutorService, executorFactory.getExecutor(), "live", this.sharedStateManager.liveLock(), lockRenewPeriodMillis, ioCriticalErrorListener);
+      this.scheduledBackupLock = ScheduledLeaseLock.of(scheduledExecutorService, executorFactory.getExecutor(), "backup", this.sharedStateManager.backupLock(), lockRenewPeriodMillis, ioCriticalErrorListener);
+      this.ioCriticalErrorListener = ioCriticalErrorListener;
+   }
+
+   @Override
+   public synchronized void start() throws Exception {
+      if (isStarted()) {
+         return;
+      }
+      if (!replicatedBackup) {
+         final UUID nodeId = sharedStateManager.setup(UUIDGenerator.getInstance()::generateUUID);
+         setUUID(nodeId);
+      }
+
+      super.start();
+   }
+
+   @Override
+   public synchronized void stop() throws Exception {
+      if (isStarted()) {
+         try {
+            this.scheduledLiveLock.stop();
+            this.scheduledBackupLock.stop();
+         } finally {
+            super.stop();
+            this.sharedStateManager.close();
+         }
+      }
+   }
+
+   @Override
+   protected void finalize() throws Throwable {
+      stop();
+   }
+
+   @Override
+   public boolean isAwaitingFailback() throws Exception {
+      return readSharedState() == SharedStateManager.State.FAILING_BACK;
+   }
+
+   @Override
+   public boolean isBackupLive() throws Exception {
+      //is anyone holding the live lock?
+      return this.scheduledLiveLock.lock().isHeld();
+   }
+
+   @Override
+   public void stopBackup() throws Exception {
+      if (replicatedBackup) {
+         final UUID nodeId = getUUID();
+         sharedStateManager.writeNodeId(nodeId);
+      }
+      releaseBackup();
+   }
+
+   @Override
+   public void interrupt() {
+      //need to be volatile: must be called concurrently to work as expected
+      interrupted = true;
+   }
+
+   @Override
+   public void releaseBackup() throws Exception {
+      if (this.scheduledBackupLock.lock().isHeldByCaller()) {
+         this.scheduledBackupLock.stop();
+         this.scheduledBackupLock.lock().release();
+      }
+   }
+
+   /**
+    * Try to acquire a lock, failing with an exception otherwise.
+    */
+   private void lock(LeaseLock lock) throws Exception {
+      final LeaseLock.AcquireResult acquireResult = lock.tryAcquire(this.lockAcquisitionTimeoutMillis, this.pauser, () -> !this.interrupted);
+      switch (acquireResult) {
+         case Timeout:
+            throw new Exception("timed out waiting for lock");
+         case Exit:
+            this.interrupted = false;
+            throw new InterruptedException("LeaseLock was interrupted");
+         case Done:
+            break;
+         default:
+            throw new AssertionError(acquireResult + " not managed");
+      }
+
+   }
+
+   private void checkInterrupted(Supplier<String> message) throws InterruptedException {
+      if (this.interrupted) {
+         interrupted = false;
+         throw new InterruptedException(message.get());
+      }
+   }
+
+   private void renewLiveLockIfNeeded(final long acquiredOn) {
+      final long acquiredMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - acquiredOn);
+      if (acquiredMillis > this.scheduledLiveLock.renewPeriodMillis()) {
+         if (!this.scheduledLiveLock.lock().renew()) {
+            final IllegalStateException e = new IllegalStateException("live lock can't be renewed");
+            try {
+               ioCriticalErrorListener.onIOException(e, "live lock can't be renewed", null);
+            } finally {
+               throw e;
+            }
+         }
+      }
+   }
+
+   /**
+    * Lock live node and check for a live state, taking care to renew it (if needed) or releasing it otherwise
+    */
+   private boolean lockLiveAndCheckLiveState() throws Exception {
+      lock(this.scheduledLiveLock.lock());
+      final long acquiredOn = System.nanoTime();
+      boolean liveWhileLocked = false;
+      //check if the state is live
+      final SharedStateManager.State stateWhileLocked;
+      try {
+         stateWhileLocked = readSharedState();
+      } catch (Throwable t) {
+         logger.error("error while holding the live node lock and tried to read the shared state", t);
+         this.scheduledLiveLock.lock().release();
+         throw t;
+      }
+      if (stateWhileLocked == SharedStateManager.State.LIVE) {
+         renewLiveLockIfNeeded(acquiredOn);
+         liveWhileLocked = true;
+      } else {
+         if (logger.isDebugEnabled()) {
+            logger.debug("state is " + stateWhileLocked + " while holding the live lock");
+         }
+         //state is not live: can (try to) release the lock
+         this.scheduledLiveLock.lock().release();
+      }
+      return liveWhileLocked;
+   }
+
+   @Override
+   public void awaitLiveNode() throws Exception {
+      boolean liveWhileLocked = false;
+      while (!liveWhileLocked) {
+         //check first without holding any lock
+         final SharedStateManager.State state = readSharedState();
+         if (state == SharedStateManager.State.LIVE) {
+            //verify if the state is live while holding the live node lock too
+            liveWhileLocked = lockLiveAndCheckLiveState();
+         } else {
+            if (logger.isDebugEnabled()) {
+               logger.debug("awaiting live node...state: " + state);
+            }
+         }
+         if (!liveWhileLocked) {
+            checkInterrupted(() -> "awaitLiveNode got interrupted!");
+            pauser.idle();
+         }
+      }
+      //state is LIVE and live lock is acquired and valid
+      logger.debug("acquired live node lock");
+      this.scheduledLiveLock.start();
+   }
+
+   @Override
+   public void startBackup() throws Exception {
+      assert !replicatedBackup; // should not be called if this is a replicating backup
+      ActiveMQServerLogger.LOGGER.waitingToBecomeBackup();
+
+      lock(scheduledBackupLock.lock());
+      scheduledBackupLock.start();
+      ActiveMQServerLogger.LOGGER.gotBackupLock();
+      if (getUUID() == null)
+         readNodeId();
+   }
+
+   @Override
+   public ActivateCallback startLiveNode() throws Exception {
+      setFailingBack();
+
+      final String timeoutMessage = lockAcquisitionTimeoutMillis == -1 ? "indefinitely" : lockAcquisitionTimeoutMillis + " milliseconds";
+
+      ActiveMQServerLogger.LOGGER.waitingToObtainLiveLock(timeoutMessage);
+
+      lock(this.scheduledLiveLock.lock());
+
+      this.scheduledLiveLock.start();
+
+      ActiveMQServerLogger.LOGGER.obtainedLiveLock();
+
+      return new ActivateCallback() {
+         @Override
+         public void preActivate() {
+         }
+
+         @Override
+         public void activated() {
+         }
+
+         @Override
+         public void deActivate() {
+         }
+
+         @Override
+         public void activationComplete() {
+            try {
+               //state can be written only if the live renew task is running
+               setLive();
+            } catch (Exception e) {
+               ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
+            }
+         }
+      };
+   }
+
+   @Override
+   public void pauseLiveServer() throws Exception {
+      if (scheduledLiveLock.isStarted()) {
+         setPaused();
+         scheduledLiveLock.stop();
+         scheduledLiveLock.lock().release();
+      } else if (scheduledLiveLock.lock().renew()) {
+         setPaused();
+         scheduledLiveLock.lock().release();
+      } else {
+         final IllegalStateException e = new IllegalStateException("live lock can't be renewed");
+         try {
+            ioCriticalErrorListener.onIOException(e, "live lock can't be renewed on pauseLiveServer", null);
+         } finally {
+            throw e;
+         }
+      }
+   }
+
+   @Override
+   public void crashLiveServer() throws Exception {
+      if (this.scheduledLiveLock.lock().isHeldByCaller()) {
+         scheduledLiveLock.stop();
+         this.scheduledLiveLock.lock().release();
+      }
+   }
+
+   @Override
+   public void awaitLiveStatus() {
+      while (readSharedState() != SharedStateManager.State.LIVE) {
+         pauser.idle();
+      }
+   }
+
+   private void setLive() {
+      writeSharedState(SharedStateManager.State.LIVE);
+   }
+
+   private void setFailingBack() {
+      writeSharedState(SharedStateManager.State.FAILING_BACK);
+   }
+
+   private void setPaused() {
+      writeSharedState(SharedStateManager.State.PAUSED);
+   }
+
+   private void writeSharedState(SharedStateManager.State state) {
+      assert !this.replicatedBackup : "the replicated backup can't write the shared state!";
+      this.sharedStateManager.writeState(state);
+   }
+
+   private SharedStateManager.State readSharedState() {
+      return this.sharedStateManager.readState();
+   }
+
+   @Override
+   public SimpleString readNodeId() {
+      final UUID nodeId = this.sharedStateManager.readNodeId();
+      setUUID(nodeId);
+      return getNodeId();
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/09a5d6f1/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManager.java
new file mode 100644
index 0000000..dad1abc
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManager.java
@@ -0,0 +1,302 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server.impl.jdbc;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.function.Supplier;
+
+import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver;
+import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
+import org.apache.activemq.artemis.utils.UUID;
+
+/**
+ * JDBC implementation of a {@link SharedStateManager}.
+ */
+@SuppressWarnings("SynchronizeOnNonFinalField")
+final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedStateManager {
+
+   private final String holderId;
+   private final long lockExpirationMillis;
+   private JdbcLeaseLock liveLock;
+   private JdbcLeaseLock backupLock;
+   private PreparedStatement readNodeId;
+   private PreparedStatement writeNodeId;
+   private PreparedStatement readState;
+   private PreparedStatement writeState;
+
+   public static JdbcSharedStateManager usingDataSource(String holderId,
+                                                        long locksExpirationMillis,
+                                                        DataSource dataSource,
+                                                        SQLProvider provider) {
+      final JdbcSharedStateManager sharedStateManager = new JdbcSharedStateManager(holderId, locksExpirationMillis);
+      sharedStateManager.setDataSource(dataSource);
+      sharedStateManager.setSqlProvider(provider);
+      try {
+         sharedStateManager.start();
+         return sharedStateManager;
+      } catch (SQLException e) {
+         throw new IllegalStateException(e);
+      }
+   }
+
+   public static JdbcSharedStateManager usingConnectionUrl(String holderId,
+                                                           long locksExpirationMillis,
+                                                           String jdbcConnectionUrl,
+                                                           String jdbcDriverClass,
+                                                           SQLProvider provider) {
+      final JdbcSharedStateManager sharedStateManager = new JdbcSharedStateManager(holderId, locksExpirationMillis);
+      sharedStateManager.setJdbcConnectionUrl(jdbcConnectionUrl);
+      sharedStateManager.setJdbcDriverClass(jdbcDriverClass);
+      sharedStateManager.setSqlProvider(provider);
+      try {
+         sharedStateManager.start();
+         return sharedStateManager;
+      } catch (SQLException e) {
+         throw new IllegalStateException(e);
+      }
+   }
+
+   @Override
+   protected void createSchema() throws SQLException {
+      try {
+         createTable(sqlProvider.createNodeManagerStoreTableSQL(), sqlProvider.createNodeIdSQL(), sqlProvider.createStateSQL(), sqlProvider.createLiveLockSQL(), sqlProvider.createBackupLockSQL());
+      } catch (SQLException e) {
+         //no op: if a table already exists is not a problem in this case, the prepareStatements() call will fail right after it if the table is not correctly initialized
+      }
+   }
+
+   static JdbcLeaseLock createLiveLock(String holderId,
+                                       Connection connection,
+                                       SQLProvider sqlProvider,
+                                       long expirationMillis,
+                                       long maxAllowableMillisDiffFromDBtime) throws SQLException {
+      return new JdbcLeaseLock(holderId, connection, connection.prepareStatement(sqlProvider.tryAcquireLiveLockSQL()), connection.prepareStatement(sqlProvider.tryReleaseLiveLockSQL()), connection.prepareStatement(sqlProvider.renewLiveLockSQL()), connection.prepareStatement(sqlProvider.isLiveLockedSQL()), connection.prepareStatement(sqlProvider.currentTimestampSQL()), expirationMillis, maxAllowableMillisDiffFromDBtime);
+   }
+
+   static JdbcLeaseLock createBackupLock(String holderId,
+                                         Connection connection,
+                                         SQLProvider sqlProvider,
+                                         long expirationMillis,
+                                         long maxAllowableMillisDiffFromDBtime) throws SQLException {
+      return new JdbcLeaseLock(holderId, connection, connection.prepareStatement(sqlProvider.tryAcquireBackupLockSQL()), connection.prepareStatement(sqlProvider.tryReleaseBackupLockSQL()), connection.prepareStatement(sqlProvider.renewBackupLockSQL()), connection.prepareStatement(sqlProvider.isBackupLockedSQL()), connection.prepareStatement(sqlProvider.currentTimestampSQL()), expirationMillis, maxAllowableMillisDiffFromDBtime);
+   }
+
+   @Override
+   protected void prepareStatements() throws SQLException {
+      this.liveLock = createLiveLock(this.holderId, this.connection, sqlProvider, lockExpirationMillis, 0);
+      this.backupLock = createBackupLock(this.holderId, this.connection, sqlProvider, lockExpirationMillis, 0);
+      this.readNodeId = connection.prepareStatement(sqlProvider.readNodeIdSQL());
+      this.writeNodeId = connection.prepareStatement(sqlProvider.writeNodeIdSQL());
+      this.writeState = connection.prepareStatement(sqlProvider.writeStateSQL());
+      this.readState = connection.prepareStatement(sqlProvider.readStateSQL());
+   }
+
+   private JdbcSharedStateManager(String holderId, long lockExpirationMillis) {
+      this.holderId = holderId;
+      this.lockExpirationMillis = lockExpirationMillis;
+   }
+
+   @Override
+   public LeaseLock liveLock() {
+      return this.liveLock;
+   }
+
+   @Override
+   public LeaseLock backupLock() {
+      return this.backupLock;
+   }
+
+   private UUID rawReadNodeId() throws SQLException {
+      final PreparedStatement preparedStatement = this.readNodeId;
+      try (ResultSet resultSet = preparedStatement.executeQuery()) {
+         if (!resultSet.next()) {
+            return null;
+         } else {
+            final String nodeId = resultSet.getString(1);
+            if (nodeId != null) {
+               return new UUID(UUID.TYPE_TIME_BASED, UUID.stringToBytes(nodeId));
+            } else {
+               return null;
+            }
+         }
+      }
+   }
+
+   @Override
+   public UUID readNodeId() {
+      synchronized (connection) {
+         try {
+            connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
+            connection.setAutoCommit(true);
+            final UUID nodeId = rawReadNodeId();
+            return nodeId;
+         } catch (SQLException e) {
+            throw new IllegalStateException(e);
+         }
+      }
+   }
+
+   @Override
+   public void writeNodeId(UUID nodeId) {
+      synchronized (connection) {
+         try {
+            connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
+            connection.setAutoCommit(true);
+            rawWriteNodeId(nodeId);
+         } catch (SQLException e) {
+            throw new IllegalStateException(e);
+         }
+      }
+   }
+
+   private void rawWriteNodeId(UUID nodeId) throws SQLException {
+      final PreparedStatement preparedStatement = this.writeNodeId;
+      preparedStatement.setString(1, nodeId.toString());
+      if (preparedStatement.executeUpdate() != 1) {
+         throw new IllegalStateException("can't write NODE_ID on the JDBC Node Manager Store!");
+      }
+   }
+
+   @Override
+   public UUID setup(Supplier<? extends UUID> nodeIdFactory) {
+      //uses a single transaction to make everything
+      synchronized (connection) {
+         try {
+            final UUID nodeId;
+            connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
+            connection.setAutoCommit(false);
+            try {
+               UUID readNodeId = rawReadNodeId();
+               if (readNodeId == null) {
+                  nodeId = nodeIdFactory.get();
+                  rawWriteNodeId(nodeId);
+               } else {
+                  nodeId = readNodeId;
+               }
+            } catch (SQLException e) {
+               connection.rollback();
+               connection.setAutoCommit(true);
+               throw e;
+            }
+            connection.commit();
+            connection.setAutoCommit(true);
+            return nodeId;
+         } catch (SQLException e) {
+            throw new IllegalStateException(e);
+         }
+      }
+   }
+
+   private static State decodeState(String s) {
+      if (s == null) {
+         return State.NOT_STARTED;
+      }
+      switch (s) {
+         case "L":
+            return State.LIVE;
+         case "F":
+            return State.FAILING_BACK;
+         case "P":
+            return State.PAUSED;
+         case "N":
+            return State.NOT_STARTED;
+         default:
+            throw new IllegalStateException("unknown state [" + s + "]");
+      }
+   }
+
+   private static String encodeState(State state) {
+      switch (state) {
+         case LIVE:
+            return "L";
+         case FAILING_BACK:
+            return "F";
+         case PAUSED:
+            return "P";
+         case NOT_STARTED:
+            return "N";
+         default:
+            throw new IllegalStateException("unknown state [" + state + "]");
+      }
+   }
+
+   @Override
+   public State readState() {
+      synchronized (connection) {
+         try {
+            connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
+            connection.setAutoCommit(true);
+            final State state;
+            final PreparedStatement preparedStatement = this.readState;
+            try (ResultSet resultSet = preparedStatement.executeQuery()) {
+               if (!resultSet.next()) {
+                  state = State.FIRST_TIME_START;
+               } else {
+                  state = decodeState(resultSet.getString(1));
+               }
+            }
+            return state;
+         } catch (SQLException e) {
+            throw new IllegalStateException(e);
+         }
+      }
+   }
+
+   @Override
+   public void writeState(State state) {
+      final String encodedState = encodeState(state);
+      synchronized (connection) {
+         try {
+            connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
+            connection.setAutoCommit(true);
+            final PreparedStatement preparedStatement = this.writeState;
+            preparedStatement.setString(1, encodedState);
+            if (preparedStatement.executeUpdate() != 1) {
+               throw new IllegalStateException("can't write STATE to the JDBC Node Manager Store!");
+            }
+         } catch (SQLException e) {
+            throw new IllegalStateException(e);
+         }
+      }
+   }
+
+   @Override
+   public void stop() throws SQLException {
+      //release all the managed resources inside the connection lock
+      if (sqlProvider.closeConnectionOnShutdown()) {
+         synchronized (connection) {
+            this.readNodeId.close();
+            this.writeNodeId.close();
+            this.readState.close();
+            this.writeState.close();
+            this.liveLock.close();
+            this.backupLock.close();
+            super.stop();
+         }
+      }
+   }
+
+   @Override
+   public void close() throws SQLException {
+      stop();
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/09a5d6f1/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/LeaseLock.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/LeaseLock.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/LeaseLock.java
new file mode 100644
index 0000000..8deda12
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/LeaseLock.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server.impl.jdbc;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.LockSupport;
+
+/**
+ * It represents a lock that can't be held more than {@link #expirationMillis()} without being renewed.
+ *
+ * <p>
+ * An implementor must provide implicitly the caller identity to contextualize each operation (eg {@link JdbcLeaseLock}
+ * uses one caller per instance)
+ */
+interface LeaseLock extends AutoCloseable {
+
+   enum AcquireResult {
+      Timeout, Exit, Done
+   }
+
+   interface ExitCondition {
+
+      /**
+       * @return true as long as we should keep running
+       */
+      boolean keepRunning();
+   }
+
+   interface Pauser {
+
+      void idle();
+
+      static Pauser sleep(long idleTime, TimeUnit timeUnit) {
+         final long idleNanos = timeUnit.toNanos(idleTime);
+         //can fail spuriously but doesn't throw any InterruptedException
+         return () -> LockSupport.parkNanos(idleNanos);
+      }
+
+      static Pauser noWait() {
+         return () -> {
+         };
+      }
+   }
+
+   /**
+    * The expiration in milliseconds from the last valid acquisition/renew.
+    */
+   default long expirationMillis() {
+      return Long.MAX_VALUE;
+   }
+
+   /**
+    * It extends the lock expiration (if held) to {@link System#currentTimeMillis()} + {@link #expirationMillis()}.
+    *
+    * @return {@code true} if the expiration has been moved on, {@code false} otherwise
+    */
+   default boolean renew() {
+      return true;
+   }
+
+   /**
+    * Not reentrant lock acquisition operation.
+    * The lock can be acquired if is not held by anyone (including the caller) or has an expired ownership.
+    *
+    * @return {@code true} if has been acquired, {@code false} otherwise
+    */
+   boolean tryAcquire();
+
+   /**
+    * Not reentrant lock acquisition operation (ie {@link #tryAcquire()}).
+    * It tries to acquire the lock until will succeed (ie {@link AcquireResult#Done})or got interrupted (ie {@link AcquireResult#Exit}).
+    * After each failed attempt is performed a {@link Pauser#idle} call.
+    */
+   default AcquireResult tryAcquire(ExitCondition exitCondition, Pauser pauser) {
+      while (exitCondition.keepRunning()) {
+         if (tryAcquire()) {
+            return AcquireResult.Done;
+         } else {
+            pauser.idle();
+         }
+      }
+      return AcquireResult.Exit;
+   }
+
+   /**
+    * Not reentrant lock acquisition operation (ie {@link #tryAcquire()}).
+    * It tries to acquire the lock until will succeed (ie {@link AcquireResult#Done}), got interrupted (ie {@link AcquireResult#Exit})
+    * or exceed {@code tryAcquireTimeoutMillis}.
+    * After each failed attempt is performed a {@link Pauser#idle} call.
+    * If the specified timeout is <=0 then it behaves as {@link #tryAcquire(ExitCondition, Pauser)}.
+    */
+   default AcquireResult tryAcquire(long tryAcquireTimeoutMillis, Pauser pauser, ExitCondition exitCondition) {
+      if (tryAcquireTimeoutMillis < 0) {
+         return tryAcquire(exitCondition, pauser);
+      }
+      final long timeoutInNanosecond = TimeUnit.MILLISECONDS.toNanos(tryAcquireTimeoutMillis);
+      final long startAcquire = System.nanoTime();
+      while (exitCondition.keepRunning()) {
+         if (tryAcquire()) {
+            return AcquireResult.Done;
+         } else if (System.nanoTime() - startAcquire >= timeoutInNanosecond) {
+            return AcquireResult.Timeout;
+         } else {
+            pauser.idle();
+            //check before doing anything if time is expired
+            if (System.nanoTime() - startAcquire >= timeoutInNanosecond) {
+               return AcquireResult.Timeout;
+            }
+         }
+      }
+      return AcquireResult.Exit;
+   }
+
+   /**
+    * @return {@code true} if there is a valid (ie not expired) owner, {@code false} otherwise
+    */
+   boolean isHeld();
+
+   /**
+    * @return {@code true} if the caller is a valid (ie not expired) owner, {@code false} otherwise
+    */
+   boolean isHeldByCaller();
+
+   /**
+    * It release the lock itself and all the resources used by it (eg connections, file handlers)
+    */
+   @Override
+   default void close() throws Exception {
+      release();
+   }
+
+   /**
+    * Perform the release if this lock is held by the caller.
+    */
+   void release();
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/09a5d6f1/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ScheduledLeaseLock.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ScheduledLeaseLock.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ScheduledLeaseLock.java
new file mode 100644
index 0000000..43751f8
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ScheduledLeaseLock.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server.impl.jdbc;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.server.ActiveMQComponent;
+import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
+
+/**
+ * {@link LeaseLock} holder that allows to schedule a {@link LeaseLock#renew} task with a fixed {@link #renewPeriodMillis()} delay.
+ */
+interface ScheduledLeaseLock extends ActiveMQComponent {
+
+   LeaseLock lock();
+
+   long renewPeriodMillis();
+
+   static ScheduledLeaseLock of(ScheduledExecutorService scheduledExecutorService,
+                                ArtemisExecutor executor,
+                                String lockName,
+                                LeaseLock lock,
+                                long renewPeriodMillis,
+                                IOCriticalErrorListener ioCriticalErrorListener) {
+      return new ActiveMQScheduledLeaseLock(scheduledExecutorService, executor, lockName, lock, renewPeriodMillis, ioCriticalErrorListener);
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/09a5d6f1/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/SharedStateManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/SharedStateManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/SharedStateManager.java
new file mode 100644
index 0000000..e26879c
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/SharedStateManager.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server.impl.jdbc;
+
+import java.util.function.Supplier;
+
+import org.apache.activemq.artemis.utils.UUID;
+
+/**
+ * Facade to abstract the operations on the shared state (inter-process and/or inter-thread) necessary to coordinate broker nodes.
+ */
+interface SharedStateManager extends AutoCloseable {
+
+   enum State {
+      LIVE, PAUSED, FAILING_BACK, NOT_STARTED, FIRST_TIME_START
+   }
+
+   LeaseLock liveLock();
+
+   LeaseLock backupLock();
+
+   UUID readNodeId();
+
+   void writeNodeId(UUID nodeId);
+
+   /**
+    * Purpose of this method is to setup the environment to provide a shared state between live/backup servers.
+    * That means:
+    * - check if a shared state exist and create it/wait for it if not
+    * - check if a nodeId exists and create it if not
+    *
+    * @param nodeIdFactory used to create the nodeId if needed
+    * @return the newly created NodeId or the old one if already present
+    */
+   UUID setup(Supplier<? extends UUID> nodeIdFactory);
+
+   State readState();
+
+   void writeState(State state);
+
+   @Override
+   default void close() throws Exception {
+
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/09a5d6f1/artemis-server/src/main/resources/schema/artemis-configuration.xsd
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 5bdc598..a615554 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -1884,6 +1884,27 @@
                </xsd:documentation>
             </xsd:annotation>
          </xsd:element>
+         <xsd:element name="jdbc-lock-acquisition-timeout" type="xsd:int" minOccurs="0" maxOccurs="1">
+            <xsd:annotation>
+               <xsd:documentation>
+                  The max allowed time in milliseconds while trying to acquire a JDBC lock.
+               </xsd:documentation>
+            </xsd:annotation>
+         </xsd:element>
+         <xsd:element name="jdbc-lock-renew-period" type="xsd:int" minOccurs="0" maxOccurs="1">
+            <xsd:annotation>
+               <xsd:documentation>
+                  The period in milliseconds of the keep alive service of a JDBC lock.
+               </xsd:documentation>
+            </xsd:annotation>
+         </xsd:element>
+         <xsd:element name="jdbc-lock-expiration" type="xsd:int" minOccurs="0" maxOccurs="1">
+            <xsd:annotation>
+               <xsd:documentation>
+                  The time in milliseconds a JDBC lock is considered valid without keeping it alive.
+               </xsd:documentation>
+            </xsd:annotation>
+         </xsd:element>
       </xsd:all>
    </xsd:complexType>
 


Mime
View raw message