activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject activemq git commit: AMQ-6370 - move cleanup lock to connection allocation to avoid contention with between store and connection pool. pool connection grant or block is now guarded by store lock which lasts till connection is closed
Date Thu, 21 Jul 2016 09:58:57 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 9f7d70ba0 -> 2a815c2e0


AMQ-6370 - move cleanup lock to connection allocation to avoid contention with between store and connection pool. pool connection grant or block is now guarded by store lock which lasts till connection is closed


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/2a815c2e
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/2a815c2e
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/2a815c2e

Branch: refs/heads/master
Commit: 2a815c2e08a5e2f5a056fdafbcd9c30dacf369d5
Parents: 9f7d70b
Author: gtully <gary.tully@gmail.com>
Authored: Thu Jul 21 10:54:26 2016 +0100
Committer: gtully <gary.tully@gmail.com>
Committed: Thu Jul 21 10:54:40 2016 +0100

----------------------------------------------------------------------
 .../store/jdbc/JDBCPersistenceAdapter.java      |   3 +
 .../activemq/store/jdbc/TransactionContext.java | 313 ++++++++++++++++++-
 .../store/jdbc/adapter/BlobJDBCAdapter.java     |   4 -
 .../store/jdbc/adapter/DefaultJDBCAdapter.java  |  91 +-----
 .../DefaultJDBCAdapterDoCreateTablesTest.java   |  22 +-
 .../store/jdbc/JDBCCleanupLimitedPoolTest.java  | 151 +++++++++
 .../store/jdbc/JDBCXACommitExceptionTest.java   |  31 +-
 .../org/apache/activemq/util/TestUtils.java     |  32 +-
 8 files changed, 507 insertions(+), 140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/2a815c2e/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
index 23b5cfa..8cb6000 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
@@ -297,6 +297,7 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
 
         if (isCreateTablesOnStartup()) {
             TransactionContext transactionContext = getTransactionContext();
+            transactionContext.getExclusiveConnection();
             transactionContext.begin();
             try {
                 try {
@@ -344,6 +345,7 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
         try {
             LOG.debug("Cleaning up old messages.");
             c = getTransactionContext();
+            c.getExclusiveConnection();
             getAdapter().doDeleteOldMessages(c);
         } catch (IOException e) {
             LOG.warn("Old message cleanup failed due to: " + e, e);
@@ -549,6 +551,7 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
     @Override
     public void deleteAllMessages() throws IOException {
         TransactionContext c = getTransactionContext();
+        c.getExclusiveConnection();
         try {
             getAdapter().doDropTables(c);
             getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences());

http://git-wip-us.apache.org/repos/asf/activemq/blob/2a815c2e/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java
index c83b969..7b0f61c 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java
@@ -17,12 +17,13 @@
 package org.apache.activemq.store.jdbc;
 
 import java.io.IOException;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.sql.Statement;
+import java.sql.*;
 import java.util.LinkedList;
-import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executor;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import javax.sql.DataSource;
 
@@ -47,14 +48,24 @@ public class TransactionContext {
     // a cheap dirty level that we can live with    
     private int transactionIsolation = Connection.TRANSACTION_READ_UNCOMMITTED;
     private LinkedList<Runnable> completions = new LinkedList<Runnable>();
+    private ReentrantReadWriteLock exclusiveConnectionLock = new ReentrantReadWriteLock();
 
     public TransactionContext(JDBCPersistenceAdapter persistenceAdapter) throws IOException {
         this.persistenceAdapter = persistenceAdapter;
         this.dataSource = persistenceAdapter.getDataSource();
     }
 
+    public Connection getExclusiveConnection() throws IOException {
+        return lockAndWrapped(exclusiveConnectionLock.writeLock());
+    }
+
     public Connection getConnection() throws IOException {
+        return lockAndWrapped(exclusiveConnectionLock.readLock());
+    }
+
+    private Connection lockAndWrapped(Lock toLock) throws IOException {
         if (connection == null) {
+            toLock.lock();
             try {
                 connection = dataSource.getConnection();
                 if (persistenceAdapter.isChangeAutoCommitAllowed()) {
@@ -64,9 +75,15 @@ public class TransactionContext {
                         connection.setAutoCommit(autoCommit);
                     }
                 }
+                connection = new UnlockOnCloseConnection(connection, toLock);
             } catch (SQLException e) {
                 JDBCPersistenceAdapter.log("Could not get JDBC connection: ", e);
                 inTx = false;
+                try {
+                    toLock.unlock();
+                } catch (IllegalMonitorStateException oops) {
+                    LOG.error("Thread does not hold the context lock on close of:"  + connection, oops);
+                }
                 close();
                 IOException ioe = IOExceptionSupport.create(e);
                 if (persistenceAdapter.getBrokerService() != null) {
@@ -260,4 +277,290 @@ public class TransactionContext {
     public void onCompletion(Runnable runnable) {
         completions.add(runnable);
     }
+
+    final private class UnlockOnCloseConnection implements Connection {
+
+        private final Connection delegate;
+        private final Lock lock;
+
+        UnlockOnCloseConnection(Connection delegate, Lock toUnlockOnClose) {
+            this.delegate = delegate;
+            this.lock = toUnlockOnClose;
+        }
+
+        @Override
+        public void close() throws SQLException {
+            try {
+                delegate.close();
+            } finally {
+                lock.unlock();
+            }
+        }
+
+        // simple delegate for the  rest of the impl..
+        @Override
+        public Statement createStatement() throws SQLException {
+            return delegate.createStatement();
+        }
+
+        @Override
+        public PreparedStatement prepareStatement(String sql) throws SQLException {
+            return delegate.prepareStatement(sql);
+        }
+
+        @Override
+        public CallableStatement prepareCall(String sql) throws SQLException {
+            return delegate.prepareCall(sql);
+        }
+
+        @Override
+        public String nativeSQL(String sql) throws SQLException {
+            return delegate.nativeSQL(sql);
+        }
+
+        @Override
+        public void setAutoCommit(boolean autoCommit) throws SQLException {
+            delegate.setAutoCommit(autoCommit);
+        }
+
+        @Override
+        public boolean getAutoCommit() throws SQLException {
+            return delegate.getAutoCommit();
+        }
+
+        @Override
+        public void commit() throws SQLException {
+            delegate.commit();
+        }
+
+        @Override
+        public void rollback() throws SQLException {
+            delegate.rollback();
+        }
+
+        @Override
+        public boolean isClosed() throws SQLException {
+            return delegate.isClosed();
+        }
+
+        @Override
+        public DatabaseMetaData getMetaData() throws SQLException {
+            return delegate.getMetaData();
+        }
+
+        @Override
+        public void setReadOnly(boolean readOnly) throws SQLException {
+            delegate.setReadOnly(readOnly);
+        }
+
+        @Override
+        public boolean isReadOnly() throws SQLException {
+            return delegate.isReadOnly();
+        }
+
+        @Override
+        public void setCatalog(String catalog) throws SQLException {
+            delegate.setCatalog(catalog);
+        }
+
+        @Override
+        public String getCatalog() throws SQLException {
+            return delegate.getCatalog();
+        }
+
+        @Override
+        public void setTransactionIsolation(int level) throws SQLException {
+            delegate.setTransactionIsolation(level);
+        }
+
+        @Override
+        public int getTransactionIsolation() throws SQLException {
+            return delegate.getTransactionIsolation();
+        }
+
+        @Override
+        public SQLWarning getWarnings() throws SQLException {
+            return delegate.getWarnings();
+        }
+
+        @Override
+        public void clearWarnings() throws SQLException {
+            delegate.clearWarnings();
+        }
+
+        @Override
+        public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
+            return delegate.createStatement(resultSetType, resultSetConcurrency);
+        }
+
+        @Override
+        public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
+            return delegate.prepareStatement(sql, resultSetType, resultSetConcurrency);
+        }
+
+        @Override
+        public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
+            return delegate.prepareCall(sql, resultSetType, resultSetConcurrency);
+        }
+
+        @Override
+        public Map<String, Class<?>> getTypeMap() throws SQLException {
+            return delegate.getTypeMap();
+        }
+
+        @Override
+        public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
+            delegate.setTypeMap(map);
+        }
+
+        @Override
+        public void setHoldability(int holdability) throws SQLException {
+            delegate.setHoldability(holdability);
+        }
+
+        @Override
+        public int getHoldability() throws SQLException {
+            return delegate.getHoldability();
+        }
+
+        @Override
+        public Savepoint setSavepoint() throws SQLException {
+            return delegate.setSavepoint();
+        }
+
+        @Override
+        public Savepoint setSavepoint(String name) throws SQLException {
+            return delegate.setSavepoint(name);
+        }
+
+        @Override
+        public void rollback(Savepoint savepoint) throws SQLException {
+            delegate.rollback(savepoint);
+        }
+
+        @Override
+        public void releaseSavepoint(Savepoint savepoint) throws SQLException {
+            delegate.releaseSavepoint(savepoint);
+        }
+
+        @Override
+        public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+            return delegate.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
+        }
+
+        @Override
+        public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+            return delegate.prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
+        }
+
+        @Override
+        public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+            return delegate.prepareCall(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
+        }
+
+        @Override
+        public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
+            return delegate.prepareStatement(sql, autoGeneratedKeys);
+        }
+
+        @Override
+        public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
+            return delegate.prepareStatement(sql, columnIndexes);
+        }
+
+        @Override
+        public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
+            return delegate.prepareStatement(sql, columnNames);
+        }
+
+        @Override
+        public Clob createClob() throws SQLException {
+            return delegate.createClob();
+        }
+
+        @Override
+        public Blob createBlob() throws SQLException {
+            return delegate.createBlob();
+        }
+
+        @Override
+        public NClob createNClob() throws SQLException {
+            return delegate.createNClob();
+        }
+
+        @Override
+        public SQLXML createSQLXML() throws SQLException {
+            return delegate.createSQLXML();
+        }
+
+        @Override
+        public boolean isValid(int timeout) throws SQLException {
+            return delegate.isValid(timeout);
+        }
+
+        @Override
+        public void setClientInfo(String name, String value) throws SQLClientInfoException {
+            delegate.setClientInfo(name, value);
+        }
+
+        @Override
+        public void setClientInfo(Properties properties) throws SQLClientInfoException {
+            delegate.setClientInfo(properties);
+        }
+
+        @Override
+        public String getClientInfo(String name) throws SQLException {
+            return delegate.getClientInfo(name);
+        }
+
+        @Override
+        public Properties getClientInfo() throws SQLException {
+            return delegate.getClientInfo();
+        }
+
+        @Override
+        public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
+            return delegate.createArrayOf(typeName, elements);
+        }
+
+        @Override
+        public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
+            return delegate.createStruct(typeName, attributes);
+        }
+
+        @Override
+        public void setSchema(String schema) throws SQLException {
+            delegate.setSchema(schema);
+        }
+
+        @Override
+        public String getSchema() throws SQLException {
+            return delegate.getSchema();
+        }
+
+        @Override
+        public void abort(Executor executor) throws SQLException {
+            delegate.abort(executor);
+        }
+
+        @Override
+        public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
+            delegate.setNetworkTimeout(executor, milliseconds);
+        }
+
+        @Override
+        public int getNetworkTimeout() throws SQLException {
+            return delegate.getNetworkTimeout();
+        }
+
+        @Override
+        public <T> T unwrap(Class<T> iface) throws SQLException {
+            return delegate.unwrap(iface);
+        }
+
+        @Override
+        public boolean isWrapperFor(Class<?> iface) throws SQLException {
+            return delegate.isWrapperFor(iface);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/2a815c2e/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/BlobJDBCAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/BlobJDBCAdapter.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/BlobJDBCAdapter.java
index 6d6721c..34c0a19 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/BlobJDBCAdapter.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/BlobJDBCAdapter.java
@@ -69,7 +69,6 @@ public class BlobJDBCAdapter extends DefaultJDBCAdapter {
     public void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data,
                              long expiration, byte priority, XATransactionId xid) throws SQLException, IOException {
         PreparedStatement s = null;
-        cleanupExclusiveLock.readLock().lock();
         try {
             // Add the Blob record.
             s = c.getConnection().prepareStatement(statements.getAddMessageStatement());
@@ -94,7 +93,6 @@ public class BlobJDBCAdapter extends DefaultJDBCAdapter {
             }
 
         } finally {
-            cleanupExclusiveLock.readLock().unlock();
             close(s);
         }
     }
@@ -127,7 +125,6 @@ public class BlobJDBCAdapter extends DefaultJDBCAdapter {
     public byte[] doGetMessage(TransactionContext c, MessageId id) throws SQLException, IOException {
         PreparedStatement s = null;
         ResultSet rs = null;
-        cleanupExclusiveLock.readLock().lock();
         try {
 
             s = c.getConnection().prepareStatement(statements.getFindMessageStatement());
@@ -149,7 +146,6 @@ public class BlobJDBCAdapter extends DefaultJDBCAdapter {
                 return os.toByteArray();
             }
         } finally {
-            cleanupExclusiveLock.readLock().unlock();
             close(rs);
             close(s);
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/2a815c2e/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
index 57438bc..8d76fe6 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
@@ -70,7 +70,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
     //This is deprecated and should be removed in a future release
     protected boolean batchStatments = true;
     protected boolean prioritizedMessages;
-    protected ReadWriteLock cleanupExclusiveLock = new ReentrantReadWriteLock();
     protected int maxRows = MAX_ROWS;
 
     protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException {
@@ -83,20 +82,14 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
 
     @Override
     public void doCreateTables(TransactionContext transactionContext) throws SQLException, IOException {
-        cleanupExclusiveLock.writeLock().lock();
-        try {
-            // Check to see if the table already exists. If it does, then don't log warnings during startup.
-            // Need to run the scripts anyways since they may contain ALTER statements that upgrade a previous version of the table
-            boolean messageTableAlreadyExists = messageTableAlreadyExists(transactionContext);
-
-            for (String createStatement : this.statements.getCreateSchemaStatements()) {
-                // This will fail usually since the tables will be
-                // created already.
-                executeStatement(transactionContext, createStatement, messageTableAlreadyExists);
-            }
-
-        } finally {
-            cleanupExclusiveLock.writeLock().unlock();
+        // Check to see if the table already exists. If it does, then don't log warnings during startup.
+        // Need to run the scripts anyways since they may contain ALTER statements that upgrade a previous version of the table
+        boolean messageTableAlreadyExists = messageTableAlreadyExists(transactionContext);
+
+        for (String createStatement : this.statements.getCreateSchemaStatements()) {
+            // This will fail usually since the tables will be
+            // created already.
+            executeStatement(transactionContext, createStatement, messageTableAlreadyExists);
         }
     }
 
@@ -150,7 +143,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
     @Override
     public void doDropTables(TransactionContext c) throws SQLException, IOException {
         Statement s = null;
-        cleanupExclusiveLock.writeLock().lock();
         try {
             s = c.getConnection().createStatement();
             String[] dropStatments = this.statements.getDropSchemaStatements();
@@ -169,7 +161,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
             }
             commitIfAutoCommitIsDisabled(c);
         } finally {
-            cleanupExclusiveLock.writeLock().unlock();
             try {
                 s.close();
             } catch (Throwable e) {
@@ -181,7 +172,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
     public long doGetLastMessageStoreSequenceId(TransactionContext c) throws SQLException, IOException {
         PreparedStatement s = null;
         ResultSet rs = null;
-        cleanupExclusiveLock.readLock().lock();
         try {
             s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement());
             rs = s.executeQuery();
@@ -200,7 +190,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
             long seq = Math.max(seq1, seq2);
             return seq;
         } finally {
-            cleanupExclusiveLock.readLock().unlock();
             close(rs);
             close(s);
         }
@@ -210,7 +199,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
     public byte[] doGetMessageById(TransactionContext c, long storeSequenceId) throws SQLException, IOException {
         PreparedStatement s = null;
         ResultSet rs = null;
-        cleanupExclusiveLock.readLock().lock();
         try {
             s = c.getConnection().prepareStatement(
                     this.statements.getFindMessageByIdStatement());
@@ -221,7 +209,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
             }
             return getBinaryData(rs, 1);
         } finally {
-            cleanupExclusiveLock.readLock().unlock();
             close(rs);
             close(s);
         }
@@ -235,7 +222,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
     public void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data,
                              long expiration, byte priority, XATransactionId xid) throws SQLException, IOException {
         PreparedStatement s = c.getAddMessageStatement();
-        cleanupExclusiveLock.readLock().lock();
         try {
             if (s == null) {
                 s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement());
@@ -264,7 +250,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
                 throw new SQLException("Failed add a message");
             }
         } finally {
-            cleanupExclusiveLock.readLock().unlock();
             if (!this.batchStatements) {
                 if (s != null) {
                     s.close();
@@ -276,7 +261,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
     @Override
     public void doUpdateMessage(TransactionContext c, ActiveMQDestination destination, MessageId id, byte[] data) throws SQLException, IOException {
         PreparedStatement s = null;
-        cleanupExclusiveLock.readLock().lock();
         try {
             s = c.getConnection().prepareStatement(this.statements.getUpdateMessageStatement());
             setBinaryData(s, 1, data);
@@ -287,7 +271,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
                 throw new IOException("Could not update message: " + id + " in " + destination);
             }
         } finally {
-            cleanupExclusiveLock.readLock().unlock();
             close(s);
         }
     }
@@ -297,7 +280,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
     public void doAddMessageReference(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination,
             long expirationTime, String messageRef) throws SQLException, IOException {
         PreparedStatement s = c.getAddMessageStatement();
-        cleanupExclusiveLock.readLock().lock();
         try {
             if (s == null) {
                 s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement());
@@ -317,7 +299,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
                 throw new SQLException("Failed add a message");
             }
         } finally {
-            cleanupExclusiveLock.readLock().unlock();
             if (!this.batchStatements) {
                 s.close();
             }
@@ -328,7 +309,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
     public long[] getStoreSequenceId(TransactionContext c, ActiveMQDestination destination, MessageId messageID) throws SQLException, IOException {
         PreparedStatement s = null;
         ResultSet rs = null;
-        cleanupExclusiveLock.readLock().lock();
         try {
             s = c.getConnection().prepareStatement(this.statements.getFindMessageSequenceIdStatement());
             s.setString(1, messageID.getProducerId().toString());
@@ -340,7 +320,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
             }
             return new long[]{rs.getLong(1), rs.getLong(2)};
         } finally {
-            cleanupExclusiveLock.readLock().unlock();
             close(rs);
             close(s);
         }
@@ -350,7 +329,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
     public byte[] doGetMessage(TransactionContext c, MessageId id) throws SQLException, IOException {
         PreparedStatement s = null;
         ResultSet rs = null;
-        cleanupExclusiveLock.readLock().lock();
         try {
             s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement());
             s.setString(1, id.getProducerId().toString());
@@ -361,7 +339,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
             }
             return getBinaryData(rs, 1);
         } finally {
-            cleanupExclusiveLock.readLock().unlock();
             close(rs);
             close(s);
         }
@@ -371,7 +348,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
     public String doGetMessageReference(TransactionContext c, long seq) throws SQLException, IOException {
         PreparedStatement s = null;
         ResultSet rs = null;
-        cleanupExclusiveLock.readLock().lock();
         try {
             s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement());
             s.setLong(1, seq);
@@ -381,7 +357,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
             }
             return rs.getString(1);
         } finally {
-            cleanupExclusiveLock.readLock().unlock();
             close(rs);
             close(s);
         }
@@ -393,7 +368,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
     @Override
     public void doRemoveMessage(TransactionContext c, long seq, XATransactionId xid) throws SQLException, IOException {
         PreparedStatement s = c.getRemovedMessageStatement();
-        cleanupExclusiveLock.readLock().lock();
         try {
             if (s == null) {
                 s = c.getConnection().prepareStatement(xid == null ?
@@ -417,7 +391,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
                 throw new SQLException("Failed to remove message seq: " + seq);
             }
         } finally {
-            cleanupExclusiveLock.readLock().unlock();
             if (!this.batchStatements && s != null) {
                 s.close();
             }
@@ -429,7 +402,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
             throws Exception {
         PreparedStatement s = null;
         ResultSet rs = null;
-        cleanupExclusiveLock.readLock().lock();
         try {
             s = c.getConnection().prepareStatement(this.statements.getFindAllMessagesStatement());
             s.setString(1, destination.getQualifiedName());
@@ -448,7 +420,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
                 }
             }
         } finally {
-            cleanupExclusiveLock.readLock().unlock();
             close(rs);
             close(s);
         }
@@ -459,7 +430,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
             JDBCMessageIdScanListener listener) throws SQLException, IOException {
         PreparedStatement s = null;
         ResultSet rs = null;
-        cleanupExclusiveLock.readLock().lock();
         try {
             s = c.getConnection().prepareStatement(this.statements.getFindAllMessageIdsStatement());
             s.setMaxRows(limit);
@@ -476,7 +446,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
                 listener.messageId(id);
             }
         } finally {
-            cleanupExclusiveLock.readLock().unlock();
             close(rs);
             close(s);
         }
@@ -486,7 +455,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
     public void doSetLastAckWithPriority(TransactionContext c, ActiveMQDestination destination, XATransactionId xid, String clientId,
                                          String subscriptionName, long seq, long priority) throws SQLException, IOException {
         PreparedStatement s = c.getUpdateLastAckStatement();
-        cleanupExclusiveLock.readLock().lock();
         try {
             if (s == null) {
                 s = c.getConnection().prepareStatement(xid == null ?
@@ -513,7 +481,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
                 throw new SQLException("Failed update last ack with priority: " + priority + ", for sub: " + subscriptionName);
             }
         } finally {
-            cleanupExclusiveLock.readLock().unlock();
             if (!this.batchStatements) {
                 close(s);
             }
@@ -525,7 +492,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
     public void doSetLastAck(TransactionContext c, ActiveMQDestination destination, XATransactionId xid, String clientId,
                              String subscriptionName, long seq, long priority) throws SQLException, IOException {
         PreparedStatement s = c.getUpdateLastAckStatement();
-        cleanupExclusiveLock.readLock().lock();
         try {
             if (s == null) {
                 s = c.getConnection().prepareStatement(xid == null ?
@@ -553,7 +519,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
                             + seq + ", for sub: " + subscriptionName);
             }
         } finally {
-            cleanupExclusiveLock.readLock().unlock();
             if (!this.batchStatements) {
                 close(s);
             }
@@ -573,7 +538,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
     @Override
     public void doClearLastAck(TransactionContext c, ActiveMQDestination destination, byte priority, String clientId, String subName) throws SQLException, IOException {
         PreparedStatement s = null;
-        cleanupExclusiveLock.readLock().lock();
         try {
             s = c.getConnection().prepareStatement(this.statements.getClearDurableLastAckInTxStatement());
             s.setString(1, destination.getQualifiedName());
@@ -584,7 +548,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
                 throw new IOException("Could not remove prepared transaction state from message ack for: " + clientId + ":" + subName);
             }
         } finally {
-            cleanupExclusiveLock.readLock().unlock();
             close(s);
         }
     }
@@ -596,7 +559,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
         // destination.getQualifiedName(),clientId,subscriptionName);
         PreparedStatement s = null;
         ResultSet rs = null;
-        cleanupExclusiveLock.readLock().lock();
         try {
             s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubMessagesStatement());
             s.setString(1, destination.getQualifiedName());
@@ -617,7 +579,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
                 }
             }
         } finally {
-            cleanupExclusiveLock.readLock().unlock();
             close(rs);
             close(s);
         }
@@ -629,7 +590,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
 
         PreparedStatement s = null;
         ResultSet rs = null;
-        cleanupExclusiveLock.readLock().lock();
         try {
             s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesStatement());
             s.setMaxRows(Math.min(maxReturned * 2, maxRows));
@@ -653,7 +613,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
                 }
             }
         } finally {
-            cleanupExclusiveLock.readLock().unlock();
             close(rs);
             close(s);
         }
@@ -665,7 +624,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
 
         PreparedStatement s = null;
         ResultSet rs = null;
-        cleanupExclusiveLock.readLock().lock();
         try {
             s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesByPriorityStatement());
             s.setMaxRows(Math.min(maxReturned * 2, maxRows));
@@ -690,7 +648,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
                 }
             }
         } finally {
-            cleanupExclusiveLock.readLock().unlock();
             close(rs);
             close(s);
         }
@@ -702,7 +659,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
         PreparedStatement s = null;
         ResultSet rs = null;
         int result = 0;
-        cleanupExclusiveLock.readLock().lock();
         try {
             if (isPrioritizedMessages) {
                 s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatementWithPriority());
@@ -717,7 +673,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
                 result = rs.getInt(1);
             }
         } finally {
-            cleanupExclusiveLock.readLock().unlock();
             close(rs);
             close(s);
         }
@@ -737,7 +692,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
         // dumpTables(c, destination.getQualifiedName(), clientId,
         // subscriptionName);
         PreparedStatement s = null;
-        cleanupExclusiveLock.readLock().lock();
         try {
             long lastMessageId = -1;
             if (!retroactive) {
@@ -774,7 +728,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
             }
 
         } finally {
-            cleanupExclusiveLock.readLock().unlock();
             close(s);
         }
     }
@@ -784,7 +737,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
             String clientId, String subscriptionName) throws SQLException, IOException {
         PreparedStatement s = null;
         ResultSet rs = null;
-        cleanupExclusiveLock.readLock().lock();
         try {
             s = c.getConnection().prepareStatement(this.statements.getFindDurableSubStatement());
             s.setString(1, destination.getQualifiedName());
@@ -803,7 +755,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
                     ActiveMQDestination.QUEUE_TYPE));
             return subscription;
         } finally {
-            cleanupExclusiveLock.readLock().unlock();
             close(rs);
             close(s);
         }
@@ -814,7 +765,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
             throws SQLException, IOException {
         PreparedStatement s = null;
         ResultSet rs = null;
-        cleanupExclusiveLock.readLock().lock();
         try {
             s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubsStatement());
             s.setString(1, destination.getQualifiedName());
@@ -832,7 +782,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
             }
             return rc.toArray(new SubscriptionInfo[rc.size()]);
         } finally {
-            cleanupExclusiveLock.readLock().unlock();
             close(rs);
             close(s);
         }
@@ -842,7 +791,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
     public void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName) throws SQLException,
             IOException {
         PreparedStatement s = null;
-        cleanupExclusiveLock.readLock().lock();
         try {
             s = c.getConnection().prepareStatement(this.statements.getRemoveAllMessagesStatement());
             s.setString(1, destinationName.getQualifiedName());
@@ -852,7 +800,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
             s.setString(1, destinationName.getQualifiedName());
             s.executeUpdate();
         } finally {
-            cleanupExclusiveLock.readLock().unlock();
             close(s);
         }
     }
@@ -861,7 +808,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
     public void doDeleteSubscription(TransactionContext c, ActiveMQDestination destination, String clientId,
             String subscriptionName) throws SQLException, IOException {
         PreparedStatement s = null;
-        cleanupExclusiveLock.readLock().lock();
         try {
             s = c.getConnection().prepareStatement(this.statements.getDeleteSubscriptionStatement());
             s.setString(1, destination.getQualifiedName());
@@ -869,7 +815,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
             s.setString(3, subscriptionName);
             s.executeUpdate();
         } finally {
-            cleanupExclusiveLock.readLock().unlock();
             close(s);
         }
     }
@@ -878,17 +823,15 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
     @Override
     public void doDeleteOldMessages(TransactionContext c) throws SQLException, IOException {
         PreparedStatement s = null;
-        cleanupExclusiveLock.writeLock().lock();
         try {
             LOG.debug("Executing SQL: " + this.statements.getDeleteOldMessagesStatementWithPriority());
-            s = c.getConnection().prepareStatement(this.statements.getDeleteOldMessagesStatementWithPriority());
+            s = c.getExclusiveConnection().prepareStatement(this.statements.getDeleteOldMessagesStatementWithPriority());
             int priority = priorityIterator++%10;
             s.setInt(1, priority);
             s.setInt(2, priority);
             int i = s.executeUpdate();
             LOG.debug("Deleted " + i + " old message(s) at priority: " + priority);
         } finally {
-            cleanupExclusiveLock.writeLock().unlock();
             close(s);
         }
     }
@@ -899,7 +842,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
         PreparedStatement s = null;
         ResultSet rs = null;
         long result = -1;
-        cleanupExclusiveLock.readLock().lock();
         try {
             s = c.getConnection().prepareStatement(this.statements.getLastAckedDurableSubscriberMessageStatement());
             s.setString(1, destination.getQualifiedName());
@@ -913,7 +855,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
                 }
             }
         } finally {
-            cleanupExclusiveLock.readLock().unlock();
             close(rs);
             close(s);
         }
@@ -939,7 +880,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
         HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
         PreparedStatement s = null;
         ResultSet rs = null;
-        cleanupExclusiveLock.readLock().lock();
         try {
             s = c.getConnection().prepareStatement(this.statements.getFindAllDestinationsStatement());
             rs = s.executeQuery();
@@ -947,7 +887,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
                 rc.add(ActiveMQDestination.createDestination(rs.getString(1), ActiveMQDestination.QUEUE_TYPE));
             }
         } finally {
-            cleanupExclusiveLock.readLock().unlock();
             close(rs);
             close(s);
         }
@@ -1024,7 +963,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
     @Override
     public void doRecordDestination(TransactionContext c, ActiveMQDestination destination) throws SQLException, IOException {
         PreparedStatement s = null;
-        cleanupExclusiveLock.readLock().lock();
         try {
             s = c.getConnection().prepareStatement(this.statements.getCreateDurableSubStatement());
             s.setString(1, destination.getQualifiedName());
@@ -1039,7 +977,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
                 throw new IOException("Could not create ack record for destination: " + destination);
             }
         } finally {
-            cleanupExclusiveLock.readLock().unlock();
             close(s);
         }
     }
@@ -1048,7 +985,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
     public void doRecoverPreparedOps(TransactionContext c, JdbcMemoryTransactionStore jdbcMemoryTransactionStore) throws SQLException, IOException {
         PreparedStatement s = null;
         ResultSet rs = null;
-        cleanupExclusiveLock.readLock().lock();
         try {
             s = c.getConnection().prepareStatement(this.statements.getFindOpsPendingOutcomeStatement());
             rs = s.executeQuery();
@@ -1080,7 +1016,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
             }
         } finally {
             close(rs);
-            cleanupExclusiveLock.readLock().unlock();
             close(s);
         }
     }
@@ -1088,7 +1023,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
     @Override
     public void doCommitAddOp(TransactionContext c, long preparedSequence, long sequence) throws SQLException, IOException {
         PreparedStatement s = null;
-        cleanupExclusiveLock.readLock().lock();
         try {
             s = c.getConnection().prepareStatement(this.statements.getClearXidFlagStatement());
             s.setLong(1, sequence);
@@ -1097,7 +1031,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
                 throw new IOException("Could not remove prepared transaction state from message add for sequenceId: " + sequence);
             }
         } finally {
-            cleanupExclusiveLock.readLock().unlock();
             close(s);
         }
     }
@@ -1109,7 +1042,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
         PreparedStatement s = null;
         ResultSet rs = null;
         int result = 0;
-        cleanupExclusiveLock.readLock().lock();
         try {
             s = c.getConnection().prepareStatement(this.statements.getDestinationMessageCountStatement());
             s.setString(1, destination.getQualifiedName());
@@ -1118,7 +1050,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
                 result = rs.getInt(1);
             }
         } finally {
-            cleanupExclusiveLock.readLock().unlock();
             close(rs);
             close(s);
         }
@@ -1130,7 +1061,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
             long maxSeq, int maxReturned, boolean isPrioritizedMessages, JDBCMessageRecoveryListener listener) throws Exception {
         PreparedStatement s = null;
         ResultSet rs = null;
-        cleanupExclusiveLock.readLock().lock();
         try {
             if (isPrioritizedMessages) {
                 s = c.getConnection().prepareStatement(limitQuery(this.statements.getFindNextMessagesByPriorityStatement()));
@@ -1172,7 +1102,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
         } catch (Exception e) {
             LOG.warn("Exception recovering next messages", e);
         } finally {
-            cleanupExclusiveLock.readLock().unlock();
             close(rs);
             close(s);
         }
@@ -1183,7 +1112,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
             throws SQLException, IOException {
         PreparedStatement s = null;
         ResultSet rs = null;
-        cleanupExclusiveLock.readLock().lock();
         try {
             s = c.getConnection().prepareStatement(this.statements.getLastProducerSequenceIdStatement());
             s.setString(1, id.toString());
@@ -1194,7 +1122,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
             }
             return seq;
         } finally {
-            cleanupExclusiveLock.readLock().unlock();
             close(rs);
             close(s);
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/2a815c2e/activemq-jdbc-store/src/test/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapterDoCreateTablesTest.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/test/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapterDoCreateTablesTest.java b/activemq-jdbc-store/src/test/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapterDoCreateTablesTest.java
index 7d152bd..08f31f7 100644
--- a/activemq-jdbc-store/src/test/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapterDoCreateTablesTest.java
+++ b/activemq-jdbc-store/src/test/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapterDoCreateTablesTest.java
@@ -30,8 +30,6 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
 
 import org.apache.activemq.store.jdbc.Statements;
 import org.apache.activemq.store.jdbc.TransactionContext;
@@ -62,12 +60,6 @@ public class DefaultJDBCAdapterDoCreateTablesTest {
 	private List<LoggingEvent> loggingEvents = new ArrayList<>();
 
 	@Mock
-	private ReadWriteLock readWriteLock;
-
-	@Mock
-	private Lock lock;
-
-	@Mock
 	private TransactionContext transactionContext;
 
 	@Mock(answer = RETURNS_DEEP_STUBS)
@@ -96,7 +88,6 @@ public class DefaultJDBCAdapterDoCreateTablesTest {
 
 
 		defaultJDBCAdapter = new DefaultJDBCAdapter();
-		defaultJDBCAdapter.cleanupExclusiveLock = readWriteLock;
 		defaultJDBCAdapter.statements = statements;
 
 		when(statements.getCreateSchemaStatements()).thenReturn(CREATE_STATEMENTS);
@@ -104,7 +95,6 @@ public class DefaultJDBCAdapterDoCreateTablesTest {
 		when(connection.getMetaData().getTables(null, null, this.statements.getFullMessageTableName(),new String[] { "TABLE" })).thenReturn(resultSet);
 		when(connection.createStatement()).thenReturn(statement1, statement2);
 		when(connection.getAutoCommit()).thenReturn(true);
-		when(readWriteLock.writeLock()).thenReturn(lock);
 	}
 
 	@After
@@ -119,8 +109,7 @@ public class DefaultJDBCAdapterDoCreateTablesTest {
 
 		defaultJDBCAdapter.doCreateTables(transactionContext);
 
-		InOrder inOrder = inOrder(lock, resultSet, connection, statement1, statement2);
-		inOrder.verify(lock).lock();
+		InOrder inOrder = inOrder(resultSet, connection, statement1, statement2);
 		inOrder.verify(resultSet).next();
 		inOrder.verify(resultSet).close();
 		inOrder.verify(connection).createStatement();
@@ -129,7 +118,6 @@ public class DefaultJDBCAdapterDoCreateTablesTest {
 		inOrder.verify(connection).createStatement();
 		inOrder.verify(statement2).execute(CREATE_STATEMENT2);
 		inOrder.verify(statement2).close();
-		inOrder.verify(lock).unlock();
 
 		assertEquals(4, loggingEvents.size());
 		assertLog(0, DEBUG, "Executing SQL: " + CREATE_STATEMENT1);
@@ -145,8 +133,7 @@ public class DefaultJDBCAdapterDoCreateTablesTest {
 
 		defaultJDBCAdapter.doCreateTables(transactionContext);
 
-		InOrder inOrder = inOrder(lock, resultSet, connection, statement1, statement2);
-		inOrder.verify(lock).lock();
+		InOrder inOrder = inOrder(resultSet, connection, statement1, statement2);
 		inOrder.verify(resultSet).next();
 		inOrder.verify(resultSet).close();
 		inOrder.verify(connection).createStatement();
@@ -155,7 +142,6 @@ public class DefaultJDBCAdapterDoCreateTablesTest {
 		inOrder.verify(connection).createStatement();
 		inOrder.verify(statement2).execute(CREATE_STATEMENT2);
 		inOrder.verify(statement2).close();
-		inOrder.verify(lock).unlock();
 
 		assertEquals(3, loggingEvents.size());
 		assertLog(0, DEBUG, "Executing SQL: " + CREATE_STATEMENT1);
@@ -170,8 +156,7 @@ public class DefaultJDBCAdapterDoCreateTablesTest {
 
 		defaultJDBCAdapter.doCreateTables(transactionContext);
 
-		InOrder inOrder = inOrder(lock, resultSet, connection, statement1, statement2);
-		inOrder.verify(lock).lock();
+		InOrder inOrder = inOrder(resultSet, connection, statement1, statement2);
 		inOrder.verify(resultSet).next();
 		inOrder.verify(resultSet).close();
 		inOrder.verify(connection).createStatement();
@@ -182,7 +167,6 @@ public class DefaultJDBCAdapterDoCreateTablesTest {
 		inOrder.verify(statement2).execute(CREATE_STATEMENT2);
 		inOrder.verify(connection).commit();
 		inOrder.verify(statement2).close();
-		inOrder.verify(lock).unlock();
 
 		assertEquals(2, loggingEvents.size());
 		assertLog(0, DEBUG, "Executing SQL: " + CREATE_STATEMENT1);

http://git-wip-us.apache.org/repos/asf/activemq/blob/2a815c2e/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCCleanupLimitedPoolTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCCleanupLimitedPoolTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCCleanupLimitedPoolTest.java
new file mode 100644
index 0000000..e704d52
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCCleanupLimitedPoolTest.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.jdbc;
+
+import org.apache.activemq.ActiveMQXAConnection;
+import org.apache.activemq.ActiveMQXAConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.util.IOHelper;
+import org.apache.commons.dbcp2.BasicDataSource;
+import org.apache.derby.jdbc.EmbeddedDataSource;
+import org.apache.derby.jdbc.EmbeddedDriver;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.jms.MessageProducer;
+import javax.jms.XASession;
+import javax.sql.DataSource;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+import java.io.File;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.apache.activemq.util.TestUtils.createXid;
+
+
+public class JDBCCleanupLimitedPoolTest {
+
+    BrokerService broker;
+    JDBCPersistenceAdapter jdbcPersistenceAdapter;
+    BasicDataSource pool;
+    EmbeddedDataSource derby;
+
+    @Before
+    public void setUp() throws Exception {
+        System.setProperty("derby.system.home", new File(IOHelper.getDefaultDataDirectory()).getCanonicalPath());
+        derby = new EmbeddedDataSource();
+        derby.setDatabaseName("derbyDb");
+        derby.setCreateDatabase("create");
+
+        broker = createBroker();
+        broker.start();
+        broker.waitUntilStarted();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        broker.stop();
+        pool.close();
+        DataSourceServiceSupport.shutdownDefaultDataSource(derby);
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        broker = new BrokerService();
+        broker.setUseJmx(false);
+        jdbcPersistenceAdapter = new JDBCPersistenceAdapter();
+        jdbcPersistenceAdapter.deleteAllMessages();
+        jdbcPersistenceAdapter.setCleanupPeriod(0);
+        jdbcPersistenceAdapter.setUseLock(false);
+        pool = new BasicDataSource();
+        pool.setDriverClassName(EmbeddedDriver.class.getCanonicalName());
+        pool.setUrl("jdbc:derby:derbyDb;create=false");
+        pool.setUsername("uid");
+        pool.setPassword("pwd");
+        pool.setMaxTotal(2);
+        jdbcPersistenceAdapter.setDataSource(pool);
+        broker.setPersistenceAdapter(jdbcPersistenceAdapter);
+        broker.addConnector("tcp://0.0.0.0:0");
+        return broker;
+    }
+
+
+    @Test
+    public void testNoDeadlockOnXaPoolExhaustion() throws Exception {
+        final CountDownLatch done = new CountDownLatch(1);
+        final CountDownLatch doneCommit = new CountDownLatch(2000);
+
+        final ActiveMQXAConnectionFactory factory = new ActiveMQXAConnectionFactory(broker.getTransportConnectorByScheme("tcp").getPublishableConnectString());
+
+        ExecutorService executorService = Executors.newCachedThreadPool();
+        // some contention over pool of 2
+        for (int i = 0; i < 3; i++) {
+            executorService.execute(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        ActiveMQXAConnection conn = (ActiveMQXAConnection) factory.createXAConnection();
+                        conn.start();
+                        XASession sess = conn.createXASession();
+                        while (done.getCount() > 0 && doneCommit.getCount() > 0) {
+                            Xid xid = createXid();
+                            sess.getXAResource().start(xid, XAResource.TMNOFLAGS);
+                            MessageProducer producer = sess.createProducer(sess.createQueue("test"));
+                            producer.send(sess.createTextMessage("test"));
+                            sess.getXAResource().end(xid, XAResource.TMSUCCESS);
+                            sess.getXAResource().prepare(xid);
+                            sess.getXAResource().commit(xid, false);
+                            doneCommit.countDown();
+                        }
+
+                        conn.close();
+
+                    } catch (Exception ignored) {
+                        ignored.printStackTrace();
+                    }
+                }
+            });
+        }
+
+
+        executorService.execute(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    while (!done.await(10, TimeUnit.MILLISECONDS) && doneCommit.getCount() > 0) {
+                        jdbcPersistenceAdapter.cleanup();
+                    }
+                } catch (Exception ignored) {
+                }
+
+            }
+        });
+
+        executorService.shutdown();
+        boolean allComplete = executorService.awaitTermination(20, TimeUnit.SECONDS);
+        done.countDown();
+        assertTrue("all complete", allComplete);
+        executorService.shutdownNow();
+
+        assertTrue("xa tx done", doneCommit.await(10, TimeUnit.SECONDS));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/2a815c2e/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCXACommitExceptionTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCXACommitExceptionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCXACommitExceptionTest.java
index 2713f9f..b8bb9ee 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCXACommitExceptionTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCXACommitExceptionTest.java
@@ -34,12 +34,12 @@ import org.apache.activemq.ActiveMQXAConnectionFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.activemq.util.TestUtils.createXid;
+
 // https://issues.apache.org/activemq/browse/AMQ-2880
 public class JDBCXACommitExceptionTest extends JDBCCommitExceptionTest {
     private static final Logger LOG = LoggerFactory.getLogger(JDBCXACommitExceptionTest.class);
 
-    private long txGenerator = System.currentTimeMillis();
-
     protected ActiveMQXAConnectionFactory factory;
 
     boolean onePhase = true;
@@ -128,32 +128,5 @@ public class JDBCXACommitExceptionTest extends JDBCCommitExceptionTest {
         return messagesReceived;
     }
 
-    public Xid createXid() throws IOException {
-
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        DataOutputStream os = new DataOutputStream(baos);
-        os.writeLong(++txGenerator);
-        os.close();
-        final byte[] bs = baos.toByteArray();
-
-        return new Xid() {
-            @Override
-            public int getFormatId() {
-                return 86;
-            }
-
-            @Override
-            public byte[] getGlobalTransactionId() {
-                return bs;
-            }
-
-            @Override
-            public byte[] getBranchQualifier() {
-                return bs;
-            }
-        };
-
-    }
-
 
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/2a815c2e/activemq-unit-tests/src/test/java/org/apache/activemq/util/TestUtils.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/util/TestUtils.java b/activemq-unit-tests/src/test/java/org/apache/activemq/util/TestUtils.java
index dd181ef..be9a6fb 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/util/TestUtils.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/util/TestUtils.java
@@ -16,13 +16,15 @@
  */
 package org.apache.activemq.util;
 
-import java.io.IOException;
+import java.io.*;
 import java.net.ServerSocket;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
 
 import javax.net.ServerSocketFactory;
+import javax.transaction.xa.Xid;
 
 public class TestUtils {
 
@@ -65,4 +67,32 @@ public class TestUtils {
 
         return ports;
     }
+
+    private static AtomicLong txGenerator = new AtomicLong(System.currentTimeMillis());
+    public static Xid createXid() throws IOException {
+
+        java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream();
+        DataOutputStream os = new DataOutputStream(baos);
+        os.writeLong(txGenerator.incrementAndGet());
+        os.close();
+        final byte[] bs = baos.toByteArray();
+
+        return new Xid() {
+            @Override
+            public int getFormatId() {
+                return 86;
+            }
+
+            @Override
+            public byte[] getGlobalTransactionId() {
+                return bs;
+            }
+
+            @Override
+            public byte[] getBranchQualifier() {
+                return bs;
+            }
+        };
+    }
+
 }


Mime
View raw message