activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject git commit: https://issues.apache.org/jira/browse/AMQ-4365 - allow lease locker to be used by kahadb - remove deps on jdbc pa. LockableService now passes a reference to a locker so it can pull the brokerService, extracted some of the jdbc lock common stu
Date Fri, 20 Sep 2013 14:34:18 GMT
Updated Branches:
  refs/heads/trunk 0f90695db -> efaa351db


https://issues.apache.org/jira/browse/AMQ-4365 - allow lease locker to be used by kahadb -
remove deps on jdbc pa. LockableService now passes a reference to a locker so it can pull
the brokerService, extracted some of the jdbc lock common stuff, additional test kahadb with
jdbc lease


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/efaa351d
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/efaa351d
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/efaa351d

Branch: refs/heads/trunk
Commit: efaa351db77f7fa52f29214c2d5e17f9e21d2eee
Parents: 0f90695
Author: gtully <gary.tully@gmail.com>
Authored: Fri Sep 20 15:33:24 2013 +0100
Committer: gtully <gary.tully@gmail.com>
Committed: Fri Sep 20 15:33:24 2013 +0100

----------------------------------------------------------------------
 .../apache/activemq/broker/AbstractLocker.java  |  10 ++
 .../activemq/broker/LockableServiceSupport.java |   7 +-
 .../java/org/apache/activemq/broker/Locker.java |   4 +
 .../activemq/store/jdbc/AbstractJDBCLocker.java | 126 +++++++++++++++++++
 .../store/jdbc/DefaultDatabaseLocker.java       |  33 +----
 .../store/jdbc/LeaseDatabaseLocker.java         |  86 ++-----------
 .../apache/activemq/store/jdbc/Statements.java  |  28 ++++-
 .../jdbc/adapter/TransactDatabaseLocker.java    |   3 +-
 .../replicated/ElectingLevelDBStore.scala       |   3 +-
 .../ft/DbRestartJDBCQueueMasterSlaveTest.java   |   2 +-
 .../broker/ft/JDBCQueueMasterSlaveTest.java     |  59 +--------
 .../broker/ft/SyncCreateDataSource.java         |  86 +++++++++++++
 .../ft/kahaDbJdbcLeaseQueueMasterSlaveTest.java | 110 ++++++++++++++++
 13 files changed, 383 insertions(+), 174 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/efaa351d/activemq-broker/src/main/java/org/apache/activemq/broker/AbstractLocker.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/AbstractLocker.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/AbstractLocker.java
index 8a56e87..38aab9b 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/AbstractLocker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/AbstractLocker.java
@@ -27,6 +27,7 @@ public abstract class AbstractLocker extends ServiceSupport implements Locker
{
     protected String name;
     protected boolean failIfLocked = false;
     protected long lockAcquireSleepInterval = DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL;
+    protected LockableServiceSupport lockable;
 
     @Override
     public boolean keepAlive() throws IOException {
@@ -38,6 +39,10 @@ public abstract class AbstractLocker extends ServiceSupport implements
Locker {
         this.lockAcquireSleepInterval = lockAcquireSleepInterval;
     }
 
+    public long getLockAcquireSleepInterval() {
+        return lockAcquireSleepInterval;
+    }
+
     @Override
     public void setName(String name) {
         this.name = name;
@@ -47,4 +52,9 @@ public abstract class AbstractLocker extends ServiceSupport implements Locker
{
     public void setFailIfLocked(boolean failIfLocked) {
         this.failIfLocked = failIfLocked;
     }
+
+    @Override
+    public void setLockable(LockableServiceSupport lockableServiceSupport) {
+        this.lockable = lockableServiceSupport;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/efaa351d/activemq-broker/src/main/java/org/apache/activemq/broker/LockableServiceSupport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/LockableServiceSupport.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/LockableServiceSupport.java
index 78480e6..4f83d30 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/LockableServiceSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/LockableServiceSupport.java
@@ -61,6 +61,7 @@ public abstract class LockableServiceSupport extends ServiceSupport implements
L
     @Override
     public void setLocker(Locker locker) throws IOException {
         this.locker = locker;
+        locker.setLockable(this);
         if (this instanceof PersistenceAdapter) {
             this.locker.configure((PersistenceAdapter)this);
         }
@@ -68,7 +69,7 @@ public abstract class LockableServiceSupport extends ServiceSupport implements
L
 
     public Locker getLocker() throws IOException {
         if (this.locker == null) {
-            this.locker = createDefaultLocker();
+            setLocker(createDefaultLocker());
         }
         return this.locker;
     }
@@ -165,4 +166,8 @@ public abstract class LockableServiceSupport extends ServiceSupport implements
L
     public void setBrokerService(BrokerService brokerService) {
         this.brokerService = brokerService;
     }
+
+    public BrokerService getBrokerService() {
+        return this.brokerService;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/efaa351d/activemq-broker/src/main/java/org/apache/activemq/broker/Locker.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/Locker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/Locker.java
index 6415def..11a2636 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/Locker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/Locker.java
@@ -54,6 +54,10 @@ public interface Locker extends Service {
      */
     public void setFailIfLocked(boolean failIfLocked);
 
+    /**
+     * A reference to what is locked
+     */
+    public void setLockable(LockableServiceSupport lockable);
 
     /**
      * Optionally configure the locker with the persistence adapter currently used

http://git-wip-us.apache.org/repos/asf/activemq/blob/efaa351d/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/AbstractJDBCLocker.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/AbstractJDBCLocker.java
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/AbstractJDBCLocker.java
new file mode 100644
index 0000000..e3cc801
--- /dev/null
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/AbstractJDBCLocker.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.jdbc;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import javax.sql.DataSource;
+import org.apache.activemq.broker.AbstractLocker;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractJDBCLocker extends AbstractLocker {
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractJDBCLocker.class);
+    protected DataSource dataSource;
+    protected Statements statements;
+
+    protected boolean createTablesOnStartup;
+    protected int queryTimeout = -1;
+
+    public void configure(PersistenceAdapter adapter) throws IOException {
+        if (adapter instanceof JDBCPersistenceAdapter) {
+            this.dataSource = ((JDBCPersistenceAdapter) adapter).getLockDataSource();
+            this.statements = ((JDBCPersistenceAdapter) adapter).getStatements();
+        }
+    }
+
+    public void setDataSource(DataSource dataSource) {
+        this.dataSource = dataSource;
+    }
+
+    public void setStatements(Statements statements) {
+        this.statements = statements;
+    }
+
+    protected void setQueryTimeout(Statement statement) throws SQLException {
+        if (queryTimeout > 0) {
+            statement.setQueryTimeout(queryTimeout);
+        }
+    }
+
+    public int getQueryTimeout() {
+        return queryTimeout;
+    }
+
+    public void setQueryTimeout(int queryTimeout) {
+        this.queryTimeout = queryTimeout;
+    }
+
+    public void setCreateTablesOnStartup(boolean createTablesOnStartup) {
+        this.createTablesOnStartup = createTablesOnStartup;
+    }
+
+    protected Connection getConnection() throws SQLException {
+        return dataSource.getConnection();
+    }
+
+    protected void close(Connection connection) {
+        if (null != connection) {
+            try {
+                connection.close();
+            } catch (SQLException e1) {
+                LOG.debug("exception while closing connection: " + e1, e1);
+            }
+        }
+    }
+
+    protected void close(Statement statement) {
+        if (null != statement) {
+            try {
+                statement.close();
+            } catch (SQLException e1) {
+                LOG.debug("exception while closing statement: " + e1, e1);
+            }
+        }
+    }
+
+    @Override
+    public void preStart() {
+        if (createTablesOnStartup) {
+            String[] createStatements = this.statements.getCreateLockSchemaStatements();
+
+            Connection connection = null;
+            Statement statement = null;
+            try {
+                connection = getConnection();
+                statement = connection.createStatement();
+                setQueryTimeout(statement);
+
+                for (int i = 0; i < createStatements.length; i++) {
+                    LOG.debug("Executing SQL: " + createStatements[i]);
+                    try {
+                        statement.execute(createStatements[i]);
+                    } catch (SQLException e) {
+                        LOG.info("Could not create lock tables; they could already exist."
+ " Failure was: "
+                                + createStatements[i] + " Message: " + e.getMessage() + "
SQLState: " + e.getSQLState()
+                                + " Vendor code: " + e.getErrorCode());
+                    }
+                }
+            } catch (SQLException e) {
+                LOG.warn("Could not create lock tables; Failure Message: " + e.getMessage()
+ " SQLState: " + e.getSQLState()
+                        + " Vendor code: " + e.getErrorCode(), e);
+            } finally {
+                close(statement);
+                close(connection);
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/efaa351d/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java
index 79c7a84..64d24ab 100644
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java
@@ -21,11 +21,6 @@ import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.sql.SQLFeatureNotSupportedException;
-
-import javax.sql.DataSource;
-
-import org.apache.activemq.broker.AbstractLocker;
-import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.util.Handler;
 import org.apache.activemq.util.ServiceStopper;
 import org.slf4j.Logger;
@@ -38,27 +33,15 @@ import org.slf4j.LoggerFactory;
  * @org.apache.xbean.XBean element="database-locker"
  * 
  */
-public class DefaultDatabaseLocker extends AbstractLocker {
+public class DefaultDatabaseLocker extends AbstractJDBCLocker {
     private static final Logger LOG = LoggerFactory.getLogger(DefaultDatabaseLocker.class);
-    protected DataSource dataSource;
-    protected Statements statements;
 
     protected volatile PreparedStatement lockCreateStatement;
     protected volatile PreparedStatement lockUpdateStatement;
     protected volatile Connection connection;
-    protected volatile boolean stopping;
     protected Handler<Exception> exceptionHandler;
-    protected int queryTimeout = 10;
 
-    public void configure(PersistenceAdapter adapter) throws IOException {
-        if (adapter instanceof JDBCPersistenceAdapter) {
-            this.dataSource = ((JDBCPersistenceAdapter) adapter).getLockDataSource();
-            this.statements = ((JDBCPersistenceAdapter) adapter).getStatements();
-        }
-    }
-    
     public void doStart() throws Exception {
-        stopping = false;
 
         LOG.info("Attempting to acquire the exclusive lock to become the Master broker");
         String sql = statements.getLockCreateStatement();
@@ -73,7 +56,7 @@ public class DefaultDatabaseLocker extends AbstractLocker {
                 break;
             } catch (Exception e) {
                 try {
-                    if (stopping) {
+                    if (isStopping()) {
                         throw new Exception(
                                 "Cannot start broker as being asked to shut down. " 
                                         + "Interrupted attempt to acquire lock: "
@@ -136,7 +119,6 @@ public class DefaultDatabaseLocker extends AbstractLocker {
     }
 
     public void doStop(ServiceStopper stopper) throws Exception {
-        stopping = true;
         try {
             if (lockCreateStatement != null) {
                 lockCreateStatement.cancel();    			
@@ -178,9 +160,7 @@ public class DefaultDatabaseLocker extends AbstractLocker {
         try {
             lockUpdateStatement = connection.prepareStatement(statements.getLockUpdateStatement());
             lockUpdateStatement.setLong(1, System.currentTimeMillis());
-            if (queryTimeout > 0) {
-                lockUpdateStatement.setQueryTimeout(queryTimeout);
-            }
+            setQueryTimeout(lockUpdateStatement);
             int rows = lockUpdateStatement.executeUpdate();
             if (rows == 1) {
                 result=true;
@@ -216,11 +196,4 @@ public class DefaultDatabaseLocker extends AbstractLocker {
         this.exceptionHandler = exceptionHandler;
     }
 
-    public int getQueryTimeout() {
-        return queryTimeout;
-    }
-
-    public void setQueryTimeout(int queryTimeout) {
-        this.queryTimeout = queryTimeout;
-    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/efaa351d/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/LeaseDatabaseLocker.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/LeaseDatabaseLocker.java
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/LeaseDatabaseLocker.java
index 37c7064..029b1df 100644
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/LeaseDatabaseLocker.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/LeaseDatabaseLocker.java
@@ -24,10 +24,6 @@ import java.sql.SQLException;
 import java.sql.Timestamp;
 import java.util.Date;
 import java.util.concurrent.TimeUnit;
-import javax.sql.DataSource;
-
-import org.apache.activemq.broker.AbstractLocker;
-import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.ServiceStopper;
 import org.slf4j.Logger;
@@ -40,42 +36,27 @@ import org.slf4j.LoggerFactory;
  * @org.apache.xbean.XBean element="lease-database-locker"
  * 
  */
-public class LeaseDatabaseLocker extends AbstractLocker {
+public class LeaseDatabaseLocker extends AbstractJDBCLocker {
     private static final Logger LOG = LoggerFactory.getLogger(LeaseDatabaseLocker.class);
-    protected DataSource dataSource;
-    protected Statements statements;
 
-    protected boolean stopping;
     protected int maxAllowableDiffFromDBTime = 0;
     protected long diffFromCurrentTime = Long.MAX_VALUE;
     protected String leaseHolderId;
-    protected int queryTimeout = -1;
-    JDBCPersistenceAdapter persistenceAdapter;
-
 
-    public void configure(PersistenceAdapter adapter) throws IOException {
-        if (adapter instanceof JDBCPersistenceAdapter) {
-            this.persistenceAdapter = (JDBCPersistenceAdapter)adapter;
-            this.dataSource = ((JDBCPersistenceAdapter) adapter).getLockDataSource();
-            this.statements = ((JDBCPersistenceAdapter) adapter).getStatements();
-        }
-    }
-    
     public void doStart() throws Exception {
-        stopping = false;
 
-        if (lockAcquireSleepInterval < persistenceAdapter.getLockKeepAlivePeriod()) {
-            LOG.warn("Persistence adapter keep alive period: " + persistenceAdapter.getLockKeepAlivePeriod()
+        if (lockAcquireSleepInterval < lockable.getLockKeepAlivePeriod()) {
+            LOG.warn("LockableService keep alive period: " + lockable.getLockKeepAlivePeriod()
                     + ", which renews the lease, is less than lockAcquireSleepInterval: "
+ lockAcquireSleepInterval
                     + ", the lease duration. These values will allow the lease to expire.");
         }
 
-        LOG.info(getLeaseHolderId() + " attempting to acquire exclusive lease to become the
Master broker");
+        LOG.info(getLeaseHolderId() + " attempting to acquire exclusive lease to become the
master");
         String sql = statements.getLeaseObtainStatement();
         LOG.debug(getLeaseHolderId() + " locking Query is "+sql);
 
         long now = 0l;
-        while (!stopping) {
+        while (!isStopping()) {
             Connection connection = null;
             PreparedStatement statement = null;
             try {
@@ -110,43 +91,13 @@ public class LeaseDatabaseLocker extends AbstractLocker {
             LOG.info(getLeaseHolderId() + " failed to acquire lease.  Sleeping for " + lockAcquireSleepInterval
+ " milli(s) before trying again...");
             TimeUnit.MILLISECONDS.sleep(lockAcquireSleepInterval);
         }
-        if (stopping) {
+        if (isStopping()) {
             throw new RuntimeException(getLeaseHolderId() + " failing lease acquire due to
stop");
         }
 
         LOG.info(getLeaseHolderId() + ", becoming master with lease expiry " + new Date(now)
+ " on dataSource: " + dataSource);
     }
 
-    private void setQueryTimeout(PreparedStatement statement) throws SQLException {
-        if (queryTimeout > 0) {
-            statement.setQueryTimeout(queryTimeout);
-        }
-    }
-
-    private Connection getConnection() throws SQLException {
-        return dataSource.getConnection();
-    }
-
-    private void close(Connection connection) {
-        if (null != connection) {
-            try {
-                connection.close();
-            } catch (SQLException e1) {
-                LOG.debug(getLeaseHolderId() + " caught exception while closing connection:
" + e1, e1);
-            }
-        }
-    }
-
-    private void close(PreparedStatement statement) {
-        if (null != statement) {
-            try {
-                statement.close();
-            } catch (SQLException e1) {
-                LOG.debug(getLeaseHolderId() + ", caught while closing statement: " + e1,
e1);
-            }
-        }
-    }
-
     private void reportLeasOwnerShipAndDuration(Connection connection) throws SQLException
{
         PreparedStatement statement = null;
         try {
@@ -188,8 +139,7 @@ public class LeaseDatabaseLocker extends AbstractLocker {
     }
 
     public void doStop(ServiceStopper stopper) throws Exception {
-        stopping = true;
-        if (persistenceAdapter.getBrokerService() != null && persistenceAdapter.getBrokerService().isRestartRequested())
{
+        if (lockable.getBrokerService() != null && lockable.getBrokerService().isRestartRequested())
{
             // keep our lease for restart
             return;
         }
@@ -244,7 +194,7 @@ public class LeaseDatabaseLocker extends AbstractLocker {
         } catch (Exception e) {
             LOG.warn(getLeaseHolderId() + ", failed to update lease: " + e, e);
             IOException ioe = IOExceptionSupport.create(e);
-            persistenceAdapter.getBrokerService().handleIOException(ioe);
+            lockable.getBrokerService().handleIOException(ioe);
             throw ioe;
         } finally {
             close(statement);
@@ -253,26 +203,10 @@ public class LeaseDatabaseLocker extends AbstractLocker {
         return result;
     }
 
-    public long getLockAcquireSleepInterval() {
-        return lockAcquireSleepInterval;
-    }
-
-    public void setLockAcquireSleepInterval(long lockAcquireSleepInterval) {
-        this.lockAcquireSleepInterval = lockAcquireSleepInterval;
-    }
-    
-    public int getQueryTimeout() {
-        return queryTimeout;
-    }
-
-    public void setQueryTimeout(int queryTimeout) {
-        this.queryTimeout = queryTimeout;
-    }
-
     public String getLeaseHolderId() {
         if (leaseHolderId == null) {
-            if (persistenceAdapter.getBrokerService() != null) {
-                leaseHolderId = persistenceAdapter.getBrokerService().getBrokerName();
+            if (lockable.getBrokerService() != null) {
+                leaseHolderId = lockable.getBrokerService().getBrokerName();
             }
         }
         return leaseHolderId;

http://git-wip-us.apache.org/repos/asf/activemq/blob/efaa351d/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/Statements.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/Statements.java
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/Statements.java
index d23adac..5df6f2e 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/Statements.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/Statements.java
@@ -57,6 +57,7 @@ public class Statements {
     private String removeAllMessagesStatement;
     private String removeAllSubscriptionsStatement;
     private String[] createSchemaStatements;
+    private String[] createLockSchemaStatements;
     private String[] dropSchemaStatements;
     private String lockCreateStatement;
     private String lockUpdateStatement;
@@ -106,10 +107,6 @@ public class Statements {
                     + ", CLIENT_ID " + stringIdDataType + " NOT NULL" + ", SUB_NAME " + stringIdDataType
                     + " NOT NULL" + ", SELECTOR " + stringIdDataType + ", LAST_ACKED_ID "
+ sequenceDataType
                     + ", PRIMARY KEY ( CONTAINER, CLIENT_ID, SUB_NAME))", 
-                "CREATE TABLE " + getFullLockTableName() 
-                    + "( ID " + longDataType + " NOT NULL, TIME " + longDataType
-                    + ", BROKER_NAME " + stringIdDataType + ", PRIMARY KEY (ID) )",
-                "INSERT INTO " + getFullLockTableName() + "(ID) VALUES (1)",
                 "ALTER TABLE " + getFullMessageTableName() + " ADD PRIORITY " + sequenceDataType,
                 "CREATE INDEX " + getFullMessageTableName() + "_PIDX ON " + getFullMessageTableName()
+ " (PRIORITY)",
                 "ALTER TABLE " + getFullMessageTableName() + " ADD XID " + stringIdDataType,
@@ -121,7 +118,24 @@ public class Statements {
                 "CREATE INDEX " + getFullAckTableName() + "_XIDX ON " + getFullAckTableName()
+ " (XID)"
             };
         }
-        return createSchemaStatements;
+        getCreateLockSchemaStatements();
+        String[] allCreateStatements = new String[createSchemaStatements.length + createLockSchemaStatements.length];
+        System.arraycopy(createSchemaStatements, 0, allCreateStatements, 0, createSchemaStatements.length);
+        System.arraycopy(createLockSchemaStatements, 0, allCreateStatements, createSchemaStatements.length,
createLockSchemaStatements.length);
+
+        return allCreateStatements;
+    }
+
+    public String[] getCreateLockSchemaStatements() {
+        if (createLockSchemaStatements == null) {
+            createLockSchemaStatements = new String[] {
+                "CREATE TABLE " + getFullLockTableName()
+                    + "( ID " + longDataType + " NOT NULL, TIME " + longDataType
+                    + ", BROKER_NAME " + stringIdDataType + ", PRIMARY KEY (ID) )",
+                "INSERT INTO " + getFullLockTableName() + "(ID) VALUES (1)"
+            };
+        }
+        return createLockSchemaStatements;
     }
 
     public String getDropAckPKAlterStatementEnd() {
@@ -762,6 +776,10 @@ public class Statements {
         this.createSchemaStatements = createSchemaStatments;
     }
 
+    public void setCreateLockSchemaStatements(String[] createLockSchemaStatments) {
+        this.createLockSchemaStatements = createLockSchemaStatments;
+    }
+
     public void setDeleteOldMessagesStatementWithPriority(String deleteOldMessagesStatementWithPriority)
{
         this.deleteOldMessagesStatementWithPriority = deleteOldMessagesStatementWithPriority;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/efaa351d/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/TransactDatabaseLocker.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/TransactDatabaseLocker.java
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/TransactDatabaseLocker.java
index 8cdef57..d69239c 100644
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/TransactDatabaseLocker.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/TransactDatabaseLocker.java
@@ -38,7 +38,6 @@ public class TransactDatabaseLocker extends DefaultDatabaseLocker {
     
     @Override
     public void doStart() throws Exception {
-        stopping = false;
 
         LOG.info("Attempting to acquire the exclusive lock to become the Master broker");
         PreparedStatement statement = null;
@@ -57,7 +56,7 @@ public class TransactDatabaseLocker extends DefaultDatabaseLocker {
                 }
                 break;
             } catch (Exception e) {
-                if (stopping) {
+                if (isStopping()) {
                     throw new Exception("Cannot start broker as being asked to shut down.
Interrupted attempt to acquire lock: " + e, e);
                 }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/efaa351d/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala
b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala
index e7fa916..368ea96 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala
@@ -22,7 +22,7 @@ import org.apache.activemq.util.{JMXSupport, ServiceStopper, ServiceSupport}
 import org.apache.activemq.leveldb.{LevelDBStoreViewMBean, LevelDBClient, RecordLog, LevelDBStore}
 import java.net.{NetworkInterface, InetAddress}
 import org.fusesource.hawtdispatch._
-import org.apache.activemq.broker.Locker
+import org.apache.activemq.broker.{LockableServiceSupport, Locker}
 import org.apache.activemq.store.PersistenceAdapter
 import java.util.concurrent.CountDownLatch
 import java.util.concurrent.atomic.AtomicBoolean
@@ -190,6 +190,7 @@ class ElectingLevelDBStore extends ProxyLevelDBStore {
 
   def createDefaultLocker(): Locker = new Locker {
 
+    def setLockable(lockable: LockableServiceSupport) {}
     def configure(persistenceAdapter: PersistenceAdapter) {}
     def setFailIfLocked(failIfLocked: Boolean) {}
     def setLockAcquireSleepInterval(lockAcquireSleepInterval: Long) {}

http://git-wip-us.apache.org/repos/asf/activemq/blob/efaa351d/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveTest.java
index ce10222..fb04803 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveTest.java
@@ -43,7 +43,7 @@ public class DbRestartJDBCQueueMasterSlaveTest extends JDBCQueueMasterSlaveTest
         verifyExpectedBroker(inflightMessageCount);
         if (++inflightMessageCount == failureCount) {
             LOG.info("STOPPING DB!@!!!!");
-            final EmbeddedDataSource ds = ((SyncDataSource)getExistingDataSource()).getDelegate();
+            final EmbeddedDataSource ds = ((SyncCreateDataSource)getExistingDataSource()).getDelegate();
             ds.setShutdownDatabase("shutdown");
             LOG.info("DB STOPPED!@!!!!");
             

http://git-wip-us.apache.org/repos/asf/activemq/blob/efaa351d/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java
index 4ce01fb..c7b0ec6 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java
@@ -41,7 +41,7 @@ public class JDBCQueueMasterSlaveTest extends QueueMasterSlaveTestSupport
{
 
     protected void setUp() throws Exception {
         // startup db
-        sharedDs = new SyncDataSource((EmbeddedDataSource) DataSourceServiceSupport.createDataSource(IOHelper.getDefaultDataDirectory()));
+        sharedDs = new SyncCreateDataSource((EmbeddedDataSource) DataSourceServiceSupport.createDataSource(IOHelper.getDefaultDataDirectory()));
         super.setUp();
     }
 
@@ -109,61 +109,4 @@ public class JDBCQueueMasterSlaveTest extends QueueMasterSlaveTestSupport
{
         return sharedDs;
     }
 
-    // prevent concurrent calls from attempting to create the db at the same time
-    // can result in "already exists in this jvm" errors
-    class SyncDataSource implements DataSource {
-        final EmbeddedDataSource delegate;
-        SyncDataSource(EmbeddedDataSource dataSource) {
-            this.delegate = dataSource;
-        }
-            @Override
-            public Connection getConnection() throws SQLException {
-                synchronized (this) {
-                    return delegate.getConnection();
-                }
-            }
-
-            @Override
-            public Connection getConnection(String username, String password) throws SQLException
{
-                synchronized (this) {
-                    return delegate.getConnection();
-                }
-            }
-
-            @Override
-            public PrintWriter getLogWriter() throws SQLException {
-                return null;
-            }
-
-            @Override
-            public void setLogWriter(PrintWriter out) throws SQLException {
-            }
-
-            @Override
-            public void setLoginTimeout(int seconds) throws SQLException {
-            }
-
-            @Override
-            public int getLoginTimeout() throws SQLException {
-                return 0;
-            }
-
-            @Override
-            public <T> T unwrap(Class<T> iface) throws SQLException {
-                return null;
-            }
-
-            @Override
-            public boolean isWrapperFor(Class<?> iface) throws SQLException {
-                return false;
-            }
-
-            EmbeddedDataSource getDelegate() {
-                return delegate;
-            }
-
-            public Logger getParentLogger() throws SQLFeatureNotSupportedException {
-                return null;
-            }
-        };
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/efaa351d/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/SyncCreateDataSource.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/SyncCreateDataSource.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/SyncCreateDataSource.java
new file mode 100644
index 0000000..5331a22
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/SyncCreateDataSource.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.ft;
+
+import java.io.PrintWriter;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.util.logging.Logger;
+import javax.sql.DataSource;
+import org.apache.derby.jdbc.EmbeddedDataSource;
+
+// prevent concurrent calls from attempting to create the db at the same time
+// can result in "already exists in this jvm" errors
+
+public class SyncCreateDataSource implements DataSource {
+    final EmbeddedDataSource delegate;
+
+    SyncCreateDataSource(EmbeddedDataSource dataSource) {
+        this.delegate = dataSource;
+    }
+
+    @Override
+    public Connection getConnection() throws SQLException {
+        synchronized (this) {
+            return delegate.getConnection();
+        }
+    }
+
+    @Override
+    public Connection getConnection(String username, String password) throws SQLException
{
+        synchronized (this) {
+            return delegate.getConnection();
+        }
+    }
+
+    @Override
+    public PrintWriter getLogWriter() throws SQLException {
+        return null;
+    }
+
+    @Override
+    public void setLogWriter(PrintWriter out) throws SQLException {
+    }
+
+    @Override
+    public int getLoginTimeout() throws SQLException {
+        return 0;
+    }
+
+    @Override
+    public void setLoginTimeout(int seconds) throws SQLException {
+    }
+
+    @Override
+    public <T> T unwrap(Class<T> iface) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public boolean isWrapperFor(Class<?> iface) throws SQLException {
+        return false;
+    }
+
+    EmbeddedDataSource getDelegate() {
+        return delegate;
+    }
+
+    public Logger getParentLogger() throws SQLFeatureNotSupportedException {
+        return null;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq/blob/efaa351d/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/kahaDbJdbcLeaseQueueMasterSlaveTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/kahaDbJdbcLeaseQueueMasterSlaveTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/kahaDbJdbcLeaseQueueMasterSlaveTest.java
new file mode 100644
index 0000000..ba59eb7
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/kahaDbJdbcLeaseQueueMasterSlaveTest.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.ft;
+
+import java.io.IOException;
+import java.net.URI;
+import javax.sql.DataSource;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.store.jdbc.DataSourceServiceSupport;
+import org.apache.activemq.store.jdbc.LeaseDatabaseLocker;
+import org.apache.activemq.store.jdbc.Statements;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.util.DefaultIOExceptionHandler;
+import org.apache.activemq.util.IOHelper;
+import org.apache.derby.jdbc.EmbeddedDataSource;
+
+public class kahaDbJdbcLeaseQueueMasterSlaveTest extends QueueMasterSlaveTestSupport {
+    protected DataSource sharedDs;
+    protected String MASTER_URL = "tcp://localhost:62001";
+    protected String SLAVE_URL  = "tcp://localhost:62002";
+
+    protected void setUp() throws Exception {
+        // startup db
+        sharedDs = new SyncCreateDataSource((EmbeddedDataSource) DataSourceServiceSupport.createDataSource(IOHelper.getDefaultDataDirectory()));
+        super.setUp();
+    }
+
+    protected void createMaster() throws Exception {
+        master = new BrokerService();
+        master.setBrokerName("master");
+        master.addConnector(MASTER_URL);
+        master.setUseJmx(false);
+        master.setPersistent(true);
+        master.setDeleteAllMessagesOnStartup(true);
+        KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) master.getPersistenceAdapter();
+        LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker();
+        leaseDatabaseLocker.setCreateTablesOnStartup(true);
+        leaseDatabaseLocker.setDataSource(getExistingDataSource());
+        leaseDatabaseLocker.setStatements(new Statements());
+        configureLocker(kahaDBPersistenceAdapter);
+        kahaDBPersistenceAdapter.setLocker(leaseDatabaseLocker);
+        configureBroker(master);
+        master.start();
+    }
+
+    protected void configureBroker(BrokerService brokerService) {
+        DefaultIOExceptionHandler stopBrokerOnStoreException = new DefaultIOExceptionHandler();
+        // we want any store io exception to stop the broker
+        stopBrokerOnStoreException.setIgnoreSQLExceptions(false);
+        brokerService.setIoExceptionHandler(stopBrokerOnStoreException);
+    }
+
+    protected void createSlave() throws Exception {
+        // use a separate thread as the slave will block waiting for
+        // the exclusive db lock
+        Thread t = new Thread() {
+            public void run() {
+                try {
+                    BrokerService broker = new BrokerService();
+                    broker.setBrokerName("slave");
+                    TransportConnector connector = new TransportConnector();
+                    connector.setUri(new URI(SLAVE_URL));
+                    broker.addConnector(connector);
+                    broker.setUseJmx(false);
+                    broker.setPersistent(true);
+                    KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter)
broker.getPersistenceAdapter();
+                    LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker();
+                    leaseDatabaseLocker.setDataSource(getExistingDataSource());
+                    leaseDatabaseLocker.setStatements(new Statements());
+                    configureLocker(kahaDBPersistenceAdapter);
+                    kahaDBPersistenceAdapter.setLocker(leaseDatabaseLocker);
+
+                    configureBroker(broker);
+                    broker.start();
+                    slave.set(broker);
+                    slaveStarted.countDown();
+                } catch (IllegalStateException expectedOnShutdown) {
+                } catch (Exception e) {
+                    fail("failed to start slave broker, reason:" + e);
+                }
+            }
+        };
+        t.start();
+    }
+
+    protected void configureLocker(KahaDBPersistenceAdapter kahaDBPersistenceAdapter) throws
IOException {
+        kahaDBPersistenceAdapter.setLockKeepAlivePeriod(500);
+        kahaDBPersistenceAdapter.getLocker().setLockAcquireSleepInterval(500);
+    }
+
+    protected DataSource getExistingDataSource() throws Exception {
+        return sharedDs;
+    }
+
+}


Mime
View raw message