Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1633E18679 for ; Wed, 27 Jan 2016 17:18:06 +0000 (UTC) Received: (qmail 49283 invoked by uid 500); 27 Jan 2016 17:17:59 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 49239 invoked by uid 500); 27 Jan 2016 17:17:59 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 49230 invoked by uid 99); 27 Jan 2016 17:17:59 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 27 Jan 2016 17:17:59 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8A03FE00D6; Wed, 27 Jan 2016 17:17:59 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Wed, 27 Jan 2016 17:18:00 -0000 Message-Id: <32c2fc9c4646493999320761bd49677a@git.apache.org> In-Reply-To: <07d4df149cbc40d1bf854cb0e08bebe9@git.apache.org> References: <07d4df149cbc40d1bf854cb0e08bebe9@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] activemq-artemis git commit: Handful of JDBC Journal Fixes Handful of JDBC Journal Fixes This patch fixes a number of bugs with the JDBC Journal implementation. Mainly around how it was handling transactions. The XA transactions tests are now enabled to test both the File and Database store. Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/5383a0c4 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/5383a0c4 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/5383a0c4 Branch: refs/heads/master Commit: 5383a0c409a7575c1af92755ed733e2424f1f319 Parents: bd26115 Author: Martyn Taylor Authored: Tue Jan 26 11:13:03 2016 +0000 Committer: Clebert Suconic Committed: Wed Jan 27 12:17:55 2016 -0500 ---------------------------------------------------------------------- artemis-jdbc-store/pom.xml | 6 + .../activemq/artemis/jdbc/store/JDBCUtils.java | 71 ++++++ .../jdbc/store/journal/JDBCJournalImpl.java | 246 ++++++++++++++----- .../journal/JDBCJournalLoaderCallback.java | 2 - .../journal/JDBCJournalReaderCallback.java | 46 ++-- .../jdbc/store/journal/JDBCJournalRecord.java | 65 ++++- .../jdbc/store/journal/JDBCJournalSync.java | 6 +- .../artemis/tests/util/ThreadLeakCheckRule.java | 8 + .../jdbc/store/journal/JDBCJournalTest.java | 3 +- .../integration/xa/BasicXaRecoveryTest.java | 31 ++- .../tests/integration/xa/BasicXaTest.java | 26 +- .../tests/integration/xa/XaTimeoutTest.java | 52 +++- 12 files changed, 442 insertions(+), 120 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5383a0c4/artemis-jdbc-store/pom.xml ---------------------------------------------------------------------- diff --git a/artemis-jdbc-store/pom.xml b/artemis-jdbc-store/pom.xml index caf2f8f..a0430b4 100644 --- a/artemis-jdbc-store/pom.xml +++ b/artemis-jdbc-store/pom.xml @@ -70,5 +70,11 @@ ${project.version} + + junit + junit + test + + http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5383a0c4/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/JDBCUtils.java ---------------------------------------------------------------------- diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/JDBCUtils.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/JDBCUtils.java new file mode 100644 index 0000000..c2ffa96 --- /dev/null +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/JDBCUtils.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.jdbc.store; + +import java.sql.Connection; +import java.sql.Driver; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Collections; +import java.util.List; + +import org.apache.derby.jdbc.AutoloadedDriver; + +public class JDBCUtils { + + public static Driver getDriver() throws Exception { + Driver dbDriver = null; + // Load Database driver, sets Derby Autoloaded Driver as lowest priority. + List drivers = Collections.list(DriverManager.getDrivers()); + if (drivers.size() <= 2 && drivers.size() > 0) { + dbDriver = drivers.get(0); + boolean isDerby = dbDriver instanceof AutoloadedDriver; + + if (drivers.size() > 1 && isDerby) { + dbDriver = drivers.get(1); + } + + if (isDerby) { + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + try { + DriverManager.getConnection("jdbc:derby:;shutdown=true"); + } + catch (Exception e) { + } + } + }); + } + } + else { + String error = drivers.isEmpty() ? "No DB driver found on class path" : "Too many DB drivers on class path, not sure which to use"; + throw new RuntimeException(error); + } + return dbDriver; + } + + public static void createTableIfNotExists(Connection connection, String tableName, String sql) throws SQLException { + ResultSet rs = connection.getMetaData().getTables(null, null, tableName, null); + if (!rs.next()) { + Statement statement = connection.createStatement(); + statement.executeUpdate(sql); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5383a0c4/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java ---------------------------------------------------------------------- diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java index 7309c94..ad0f869 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java @@ -19,17 +19,16 @@ package org.apache.activemq.artemis.jdbc.store.journal; import java.sql.Connection; import java.sql.Driver; -import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Timer; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -43,7 +42,8 @@ import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.RecordInfo; import org.apache.activemq.artemis.core.journal.TransactionFailureCallback; import org.apache.activemq.artemis.core.journal.impl.JournalFile; -import org.apache.derby.jdbc.AutoloadedDriver; +import org.apache.activemq.artemis.jdbc.store.JDBCUtils; +import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; public class JDBCJournalImpl implements Journal { @@ -66,7 +66,7 @@ public class JDBCJournalImpl implements Journal { private PreparedStatement deleteJournalRecords; - private PreparedStatement deleteTxJournalRecords; + private PreparedStatement deleteJournalTxRecords; private boolean started; @@ -78,61 +78,35 @@ public class JDBCJournalImpl implements Journal { private final ReadWriteLock journalLock = new ReentrantReadWriteLock(); - private boolean isDerby = false; + private final String timerThread; + + // Track Tx Records + private Map transactions = new ConcurrentHashMap<>(); + + private boolean isLoaded = false; public JDBCJournalImpl(String jdbcUrl, String tableName) { this.tableName = tableName; this.jdbcUrl = jdbcUrl; + timerThread = "Timer JDBC Journal(" + tableName + ")"; records = new ArrayList(); } @Override public void start() throws Exception { - // Load Database driver, sets Derby Autoloaded Driver as lowest priority. - List drivers = Collections.list(DriverManager.getDrivers()); - if (drivers.size() <= 2 && drivers.size() > 0) { - dbDriver = drivers.get(0); - isDerby = dbDriver instanceof AutoloadedDriver; - - if (drivers.size() > 1 && isDerby) { - dbDriver = drivers.get(1); - } - - if (isDerby) { - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - try { - DriverManager.getConnection("jdbc:derby:;shutdown=true"); - } - catch (Exception e) { - } - } - }); - } - } - else { - String error = drivers.isEmpty() ? "No DB driver found on class path" : "Too many DB drivers on class path, not sure which to use"; - throw new RuntimeException(error); - } - + dbDriver = JDBCUtils.getDriver(); connection = dbDriver.connect(jdbcUrl, new Properties()); - // If JOURNAL table doesn't exist then create it - ResultSet rs = connection.getMetaData().getTables(null, null, tableName, null); - if (!rs.next()) { - Statement statement = connection.createStatement(); - statement.executeUpdate(JDBCJournalRecord.createTableSQL(tableName)); - } + JDBCUtils.createTableIfNotExists(connection, tableName, JDBCJournalRecord.createTableSQL(tableName)); insertJournalRecords = connection.prepareStatement(JDBCJournalRecord.insertRecordsSQL(tableName)); selectJournalRecords = connection.prepareStatement(JDBCJournalRecord.selectRecordsSQL(tableName)); countJournalRecords = connection.prepareStatement("SELECT COUNT(*) FROM " + tableName); deleteJournalRecords = connection.prepareStatement(JDBCJournalRecord.deleteRecordsSQL(tableName)); - deleteTxJournalRecords = connection.prepareStatement(JDBCJournalRecord.deleteTxRecordsSQL(tableName)); + deleteJournalTxRecords = connection.prepareStatement(JDBCJournalRecord.deleteJournalTxRecordsSQL(tableName)); - syncTimer = new Timer(); + syncTimer = new Timer(timerThread, true); syncTimer.scheduleAtFixedRate(new JDBCJournalSync(this), SYNC_DELAY * 2, SYNC_DELAY); started = true; @@ -145,12 +119,17 @@ public class JDBCJournalImpl implements Journal { public synchronized void stop(boolean shutdownConnection) throws Exception { if (started) { + journalLock.writeLock().lock(); + syncTimer.cancel(); + sync(); if (shutdownConnection) { connection.close(); } + started = false; + journalLock.writeLock().unlock(); } } @@ -158,47 +137,134 @@ public class JDBCJournalImpl implements Journal { connection.setAutoCommit(false); Statement statement = connection.createStatement(); statement.executeUpdate("DROP TABLE " + tableName); + statement.close(); connection.commit(); stop(); } - public int sync() throws SQLException { + public synchronized int sync() { + if (!started) + return 0; + List recordRef = records; records = new ArrayList(); - for (JDBCJournalRecord record : recordRef) { - record.storeLineUp(); - - switch (record.getRecordType()) { - case JDBCJournalRecord.DELETE_RECORD: - record.writeDeleteRecord(deleteJournalRecords); - break; - case JDBCJournalRecord.DELETE_RECORD_TX: - record.writeDeleteTxRecord(deleteTxJournalRecords); - break; - default: - record.writeRecord(insertJournalRecords); - break; + // We keep a list of deleted records (used for cleaning up old transaction data). + List deletedRecords = new ArrayList<>(); + + TransactionHolder holder; + + boolean success = false; + try { + for (JDBCJournalRecord record : recordRef) { + record.storeLineUp(); + + switch (record.getRecordType()) { + case JDBCJournalRecord.DELETE_RECORD: + // Standard SQL Delete Record, Non transactional delete + deletedRecords.add(record.getId()); + record.writeDeleteRecord(deleteJournalRecords); + break; + case JDBCJournalRecord.ROLLBACK_RECORD: + // Roll back we remove all records associated with this TX ID. This query is always performed last. + holder = transactions.get(record.getTxId()); + deleteJournalTxRecords.setLong(1, record.getTxId()); + deleteJournalTxRecords.addBatch(); + break; + case JDBCJournalRecord.COMMIT_RECORD: + // We perform all the deletes and add the commit record in the same Database TX + holder = transactions.get(record.getTxId()); + for (RecordInfo info : holder.recordsToDelete) { + deletedRecords.add(record.getId()); + deleteJournalRecords.setLong(1, info.id); + deleteJournalRecords.addBatch(); + } + record.writeRecord(insertJournalRecords); + break; + default: + // Default we add a new record to the DB + record.writeRecord(insertJournalRecords); + break; + } } } + catch (SQLException e) { + executeCallbacks(recordRef, success); + return 0; + } - boolean success = false; try { connection.setAutoCommit(false); + insertJournalRecords.executeBatch(); deleteJournalRecords.executeBatch(); - deleteTxJournalRecords.executeBatch(); + deleteJournalTxRecords.executeBatch(); + connection.commit(); + + cleanupTxRecords(deletedRecords); success = true; } catch (SQLException e) { - connection.rollback(); - e.printStackTrace(); + performRollback(connection, recordRef); } + executeCallbacks(recordRef, success); return recordRef.size(); } + /* We store Transaction reference in memory (once all records associated with a Tranascation are Deleted, + we remove the Tx Records (i.e. PREPARE, COMMIT). */ + private void cleanupTxRecords(List deletedRecords) throws SQLException { + + List iterableCopy; + List iterableCopyTx = new ArrayList<>(); + iterableCopyTx.addAll(transactions.values()); + + // TODO (mtaylor) perhaps we could store a reverse mapping of IDs to prevent this O(n) loop + for (TransactionHolder h : iterableCopyTx) { + + iterableCopy = new ArrayList<>(); + iterableCopy.addAll(h.recordInfos); + + for (RecordInfo info : iterableCopy) { + if (deletedRecords.contains(info.id)) { + h.recordInfos.remove(info); + } + } + + if (h.recordInfos.isEmpty()) { + deleteJournalTxRecords.setLong(1, h.transactionID); + deleteJournalTxRecords.addBatch(); + transactions.remove(h.transactionID); + } + } + } + + private void performRollback(Connection connection, List records) { + try { + connection.rollback(); + for (JDBCJournalRecord record : records) { + if (record.isTransactional() || record.getRecordType() == JDBCJournalRecord.PREPARE_RECORD) { + removeTxRecord(record); + } + } + + List txHolders = new ArrayList<>(); + txHolders.addAll(transactions.values()); + + // On rollback we must update the tx map to remove all the tx entries + for (TransactionHolder txH : txHolders) { + if (txH.prepared == false && txH.recordInfos.isEmpty() && txH.recordsToDelete.isEmpty()) { + transactions.remove(txH.transactionID); + } + } + } + catch (Exception sqlE) { + ActiveMQJournalLogger.LOGGER.error("Error performing rollback", sqlE); + } + } + // TODO Use an executor. private void executeCallbacks(final List records, final boolean result) { Runnable r = new Runnable() { @@ -213,9 +279,12 @@ public class JDBCJournalImpl implements Journal { t.start(); } - private void appendRecord(JDBCJournalRecord record) throws SQLException { + private synchronized void appendRecord(JDBCJournalRecord record) { try { journalLock.writeLock().lock(); + if (record.isTransactional() || record.getRecordType() == JDBCJournalRecord.PREPARE_RECORD) { + addTxRecord(record); + } records.add(record); } finally { @@ -223,6 +292,46 @@ public class JDBCJournalImpl implements Journal { } } + private void addTxRecord(JDBCJournalRecord record) { + TransactionHolder txHolder = transactions.get(record.getTxId()); + if (txHolder == null) { + txHolder = new TransactionHolder(record.getTxId()); + transactions.put(record.getTxId(), txHolder); + } + + // We actually only need the record ID in this instance. + if (record.isTransactional()) { + RecordInfo info = new RecordInfo(record.getTxId(), record.getRecordType(), new byte[0], record.isUpdate(), record.getCompactCount()); + if (record.getRecordType() == JDBCJournalRecord.DELETE_RECORD_TX) { + txHolder.recordsToDelete.add(info); + } + else { + txHolder.recordInfos.add(info); + } + } + else { + txHolder.prepared = true; + } + } + + private void removeTxRecord(JDBCJournalRecord record) { + TransactionHolder txHolder = transactions.get(record.getTxId()); + + // We actually only need the record ID in this instance. + if (record.isTransactional()) { + RecordInfo info = new RecordInfo(record.getTxId(), record.getRecordType(), new byte[0], record.isUpdate(), record.getCompactCount()); + if (record.getRecordType() == JDBCJournalRecord.DELETE_RECORD_TX) { + txHolder.recordsToDelete.remove(info); + } + else { + txHolder.recordInfos.remove(info); + } + } + else { + txHolder.prepared = false; + } + } + @Override public void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception { JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD); @@ -369,14 +478,14 @@ public class JDBCJournalImpl implements Journal { @Override public void appendCommitRecord(long txID, boolean sync) throws Exception { - JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.COMMIT_RECORD); + JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD); r.setTxId(txID); appendRecord(r); } @Override public void appendCommitRecord(long txID, boolean sync, IOCompletion callback) throws Exception { - JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.COMMIT_RECORD); + JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD); r.setTxId(txID); r.setIoCompletion(callback); appendRecord(r); @@ -387,7 +496,7 @@ public class JDBCJournalImpl implements Journal { boolean sync, IOCompletion callback, boolean lineUpContext) throws Exception { - JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.COMMIT_RECORD); + JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD); r.setTxId(txID); r.setStoreLineUp(lineUpContext); r.setIoCompletion(callback); @@ -396,7 +505,7 @@ public class JDBCJournalImpl implements Journal { @Override public void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync) throws Exception { - JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.PREPARE_RECORD); + JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.PREPARE_RECORD); r.setTxId(txID); r.setTxData(transactionData); r.setSync(sync); @@ -411,6 +520,7 @@ public class JDBCJournalImpl implements Journal { JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.PREPARE_RECORD); r.setTxId(txID); r.setTxData(transactionData); + r.setTxData(transactionData); r.setSync(sync); r.setIoCompletion(callback); appendRecord(r); @@ -435,7 +545,7 @@ public class JDBCJournalImpl implements Journal { @Override public void appendRollbackRecord(long txID, boolean sync, IOCompletion callback) throws Exception { - JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.PREPARE_RECORD); + JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.ROLLBACK_RECORD); r.setTxId(txID); r.setSync(sync); r.setIoCompletion(callback); @@ -443,7 +553,7 @@ public class JDBCJournalImpl implements Journal { } @Override - public JournalLoadInformation load(LoaderCallback reloadManager) throws Exception { + public synchronized JournalLoadInformation load(LoaderCallback reloadManager) throws Exception { JournalLoadInformation jli = new JournalLoadInformation(); JDBCJournalReaderCallback jrc = new JDBCJournalReaderCallback(reloadManager); JDBCJournalRecord r; @@ -485,8 +595,12 @@ public class JDBCJournalImpl implements Journal { } noRecords++; } + jrc.checkPreparedTx(); + jli.setMaxID(((JDBCJournalLoaderCallback) reloadManager).getMaxId()); jli.setNumberOfRecords(noRecords); + transactions = jrc.getTransactions(); + isLoaded = true; } return jli; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5383a0c4/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalLoaderCallback.java ---------------------------------------------------------------------- diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalLoaderCallback.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalLoaderCallback.java index 63a7d40..12b8671 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalLoaderCallback.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalLoaderCallback.java @@ -81,7 +81,6 @@ public class JDBCJournalLoaderCallback implements LoaderCallback { public synchronized void updateRecord(final RecordInfo info) { int index = committedRecords.size(); committedRecords.add(index, info); - deleteReferences.get(info.id).add(index); } public synchronized void deleteRecord(final long id) { @@ -106,5 +105,4 @@ public class JDBCJournalLoaderCallback implements LoaderCallback { public long getMaxId() { return maxId; } - } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5383a0c4/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalReaderCallback.java ---------------------------------------------------------------------- diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalReaderCallback.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalReaderCallback.java index 6e078be..844fd4a 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalReaderCallback.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalReaderCallback.java @@ -19,14 +19,13 @@ package org.apache.activemq.artemis.jdbc.store.journal; import java.util.LinkedHashMap; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import org.apache.activemq.artemis.core.journal.LoaderCallback; +import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.RecordInfo; import org.apache.activemq.artemis.core.journal.impl.JournalFile; import org.apache.activemq.artemis.core.journal.impl.JournalReaderCallback; -import org.apache.activemq.artemis.core.journal.impl.JournalTransaction; +import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; public class JDBCJournalReaderCallback implements JournalReaderCallback { @@ -34,9 +33,7 @@ public class JDBCJournalReaderCallback implements JournalReaderCallback { private final LoaderCallback loadManager; - private final ConcurrentMap transactions = new ConcurrentHashMap(); - - public JDBCJournalReaderCallback(LoaderCallback loadManager) { + public JDBCJournalReaderCallback(final LoaderCallback loadManager) { this.loadManager = loadManager; } @@ -53,7 +50,12 @@ public class JDBCJournalReaderCallback implements JournalReaderCallback { } public void onReadUpdateRecordTX(final long transactionID, final RecordInfo info) throws Exception { - onReadAddRecordTX(transactionID, info); + TransactionHolder tx = loadTransactions.get(transactionID); + if (tx == null) { + tx = new TransactionHolder(transactionID); + loadTransactions.put(transactionID, tx); + } + tx.recordInfos.add(info); } public void onReadAddRecordTX(final long transactionID, final RecordInfo info) throws Exception { @@ -87,14 +89,9 @@ public class JDBCJournalReaderCallback implements JournalReaderCallback { } public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception { + // It is possible that the TX could be null, since deletes could have happened in the journal. TransactionHolder tx = loadTransactions.remove(transactionID); if (tx != null) { - // JournalTransaction journalTransaction = transactions.remove(transactionID); - // if (journalTransaction == null) - // { - // throw new IllegalStateException("Cannot Commit, tx not found with ID: " + transactionID); - // } - for (RecordInfo txRecord : tx.recordInfos) { if (txRecord.isUpdate) { loadManager.updateRecord(txRecord); @@ -103,10 +100,6 @@ public class JDBCJournalReaderCallback implements JournalReaderCallback { loadManager.addRecord(txRecord); } } - - for (RecordInfo deleteValue : tx.recordsToDelete) { - loadManager.deleteRecord(deleteValue.id); - } } } @@ -121,4 +114,23 @@ public class JDBCJournalReaderCallback implements JournalReaderCallback { public void markAsDataFile(JournalFile file) { // Not needed for JDBC journal impl } + + public void checkPreparedTx() { + for (TransactionHolder transaction : loadTransactions.values()) { + if (!transaction.prepared || transaction.invalid) { + ActiveMQJournalLogger.LOGGER.uncomittedTxFound(transaction.transactionID); + loadManager.failedTransaction(transaction.transactionID, transaction.recordInfos, transaction.recordsToDelete); + } + else { + PreparedTransactionInfo info = new PreparedTransactionInfo(transaction.transactionID, transaction.extraData); + info.getRecords().addAll(transaction.recordInfos); + info.getRecordsToDelete().addAll(transaction.recordsToDelete); + loadManager.addPreparedTransaction(info); + } + } + } + + public Map getTransactions() { + return loadTransactions; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5383a0c4/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java ---------------------------------------------------------------------- diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java index 30f02da..f5e76a3 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java @@ -29,6 +29,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.journal.IOCompletion; import org.apache.activemq.artemis.core.journal.RecordInfo; +import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; import org.apache.activemq.artemis.utils.ActiveMQBufferInputStream; public class JDBCJournalRecord { @@ -85,10 +86,14 @@ public class JDBCJournalRecord { private boolean isUpdate; + private boolean isTransactional; + public JDBCJournalRecord(long id, byte recordType) { this.id = id; this.recordType = recordType; - this.isUpdate = recordType == UPDATE_RECORD || recordType == UPDATE_RECORD_TX; + + isUpdate = recordType == UPDATE_RECORD || recordType == UPDATE_RECORD_TX; + isTransactional = recordType == UPDATE_RECORD_TX || recordType == ADD_RECORD_TX || recordType == DELETE_RECORD_TX; // set defaults compactCount = 0; @@ -102,11 +107,12 @@ public class JDBCJournalRecord { } public static String createTableSQL(String tableName) { - return "CREATE TABLE " + tableName + "(id BIGINT, " + "recordType SMALLINT, " + "compactCount SMALLINT, " + "txId BIGINT, " + "userRecordType SMALLINT, " + "variableSize INTEGER, " + "record BLOB, " + "txDataSize INTEGER, " + "txData BLOB, " + "txCheckNoRecords INTEGER)"; + return "CREATE TABLE " + tableName + "(id BIGINT,recordType SMALLINT,compactCount SMALLINT,txId BIGINT,userRecordType SMALLINT,variableSize INTEGER,record BLOB,txDataSize INTEGER,txData BLOB,txCheckNoRecords INTEGER,timestamp BIGINT)"; } public static String insertRecordsSQL(String tableName) { - return "INSERT INTO " + tableName + "(id," + "recordType," + "compactCount," + "txId," + "userRecordType," + "variableSize," + "record," + "txDataSize," + "txData," + "txCheckNoRecords) " + "VALUES (?,?,?,?,?,?,?,?,?,?)"; + return "INSERT INTO " + tableName + "(id,recordType,compactCount,txId,userRecordType,variableSize,record,txDataSize,txData,txCheckNoRecords,timestamp) " + + "VALUES (?,?,?,?,?,?,?,?,?,?,?)"; } public static String selectRecordsSQL(String tableName) { @@ -117,8 +123,20 @@ public class JDBCJournalRecord { return "DELETE FROM " + tableName + " WHERE id = ?"; } - public static String deleteTxRecordsSQL(String tableName) { - return "DELETE FROM " + tableName + " WHERE txId = ?"; + public static String deleteCommittedDeleteRecordsForTxSQL(String tableName) { + return "DELETE FROM " + tableName + " WHERE id IN (SELECT id FROM " + tableName + " WHERE txID=?)"; + } + + public static String deleteCommittedTxRecordsSQL(String tableName) { + return "DELETE FROM " + tableName + " WHERE txId=? AND (recordType=" + PREPARE_RECORD + " OR recordType=" + COMMIT_RECORD + ")"; + } + + public static String deleteJournalTxRecordsSQL(String tableName) { + return "DELETE FROM " + tableName + " WHERE txId=?"; + } + + public static String deleteRolledBackTxSQL(String tableName) { + return "DELETE FROM " + tableName + " WHERE txId=?"; } public void complete(boolean success) { @@ -127,7 +145,7 @@ public class JDBCJournalRecord { ioCompletion.done(); } else { - ioCompletion.onError(1, "DATABASE INSERT FAILED"); + ioCompletion.onError(1, "DATABASE TRANSACTION FAILED"); } } } @@ -139,16 +157,29 @@ public class JDBCJournalRecord { } protected void writeRecord(PreparedStatement statement) throws SQLException { + + byte[] recordBytes = new byte[variableSize]; + byte[] txDataBytes = new byte[txDataSize]; + + try { + record.read(recordBytes); + txData.read(txDataBytes); + } + catch (IOException e) { + ActiveMQJournalLogger.LOGGER.error("Error occurred whilst reading Journal Record", e); + } + statement.setLong(1, id); statement.setByte(2, recordType); statement.setByte(3, compactCount); statement.setLong(4, txId); statement.setByte(5, userRecordType); statement.setInt(6, variableSize); - statement.setBlob(7, record); + statement.setBytes(7, recordBytes); statement.setInt(8, txDataSize); - statement.setBlob(9, txData); + statement.setBytes(9, txDataBytes); statement.setInt(10, txCheckNoRecords); + statement.setLong(11, System.currentTimeMillis()); statement.addBatch(); } @@ -240,8 +271,10 @@ public class JDBCJournalRecord { } public void setRecord(byte[] record) { - this.variableSize = record.length; - this.record = new ByteArrayInputStream(record); + if (record != null) { + this.variableSize = record.length; + this.record = new ByteArrayInputStream(record); + } } public void setRecord(InputStream record) { @@ -287,14 +320,16 @@ public class JDBCJournalRecord { public void setTxData(EncodingSupport txData) { this.txDataSize = txData.getEncodeSize(); - ActiveMQBuffer encodedBuffer = ActiveMQBuffers.fixedBuffer(variableSize); + ActiveMQBuffer encodedBuffer = ActiveMQBuffers.fixedBuffer(txDataSize); txData.encode(encodedBuffer); this.txData = new ActiveMQBufferInputStream(encodedBuffer); } public void setTxData(byte[] txData) { - this.txDataSize = txData.length; - this.txData = new ByteArrayInputStream(txData); + if (txData != null) { + this.txDataSize = txData.length; + this.txData = new ByteArrayInputStream(txData); + } } public boolean isUpdate() { @@ -316,4 +351,8 @@ public class JDBCJournalRecord { public RecordInfo toRecordInfo() throws IOException { return new RecordInfo(getId(), getUserRecordType(), getRecordData(), isUpdate(), getCompactCount()); } + + public boolean isTransactional() { + return isTransactional; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5383a0c4/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalSync.java ---------------------------------------------------------------------- diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalSync.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalSync.java index 551510a..a224625 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalSync.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalSync.java @@ -17,7 +17,6 @@ package org.apache.activemq.artemis.jdbc.store.journal; -import java.sql.SQLException; import java.util.TimerTask; public class JDBCJournalSync extends TimerTask { @@ -30,11 +29,8 @@ public class JDBCJournalSync extends TimerTask { @Override public void run() { - try { + if (journal.isStarted()) { journal.sync(); } - catch (SQLException e) { - e.printStackTrace(); - } } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5383a0c4/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ThreadLeakCheckRule.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ThreadLeakCheckRule.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ThreadLeakCheckRule.java index b87f5a7..846d31b 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ThreadLeakCheckRule.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ThreadLeakCheckRule.java @@ -171,6 +171,14 @@ public class ThreadLeakCheckRule extends ExternalResource { //another netty thread return true; } + else if (threadName.contains("derby")) { + // The derby engine is initialized once, and lasts the lifetime of the VM + return true; + } + else if (threadName.contains("Timer")) { + // The timer threads in Derby and JDBC use daemon and shutdown once user threads exit. + return true; + } else if (threadName.contains("hawtdispatch")) { // Static workers used by MQTT client. return true; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5383a0c4/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java index ed563ed..6d58ed8 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java @@ -53,12 +53,11 @@ public class JDBCJournalTest { public void testInsertRecords() throws Exception { int noRecords = 10; for (int i = 0; i < noRecords; i++) { - journal.appendAddRecord(1, (byte) 1, new byte[0], true); + journal.appendAddRecord(i, (byte) 1, new byte[0], true); } Thread.sleep(3000); assertEquals(noRecords, journal.getNumberOfRecords()); - } @Test http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5383a0c4/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaRecoveryTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaRecoveryTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaRecoveryTest.java index 61069ac..d69a0dc 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaRecoveryTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaRecoveryTest.java @@ -25,6 +25,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.api.core.management.QueueControl; import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.config.StoreConfiguration; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.transaction.impl.XidImpl; @@ -38,15 +39,19 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import javax.management.MBeanServer; import javax.management.MBeanServerFactory; import javax.transaction.xa.XAResource; import javax.transaction.xa.Xid; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.Map; +@RunWith(Parameterized.class) public class BasicXaRecoveryTest extends ActiveMQTestBase { private static IntegrationTestLogger log = IntegrationTestLogger.LOGGER; @@ -71,13 +76,32 @@ public class BasicXaRecoveryTest extends ActiveMQTestBase { private MBeanServer mbeanServer; + protected StoreConfiguration.StoreType storeType; + + public BasicXaRecoveryTest(StoreConfiguration.StoreType storeType) { + this.storeType = storeType; + } + + @Parameterized.Parameters(name = "storeType") + public static Collection data() { + Object[][] params = new Object[][] {{StoreConfiguration.StoreType.FILE}, {StoreConfiguration.StoreType.DATABASE}}; + return Arrays.asList(params); + } + @Override @Before public void setUp() throws Exception { super.setUp(); addressSettings.clear(); - configuration = createDefaultInVMConfig().setJMXManagementEnabled(true); + + if (storeType == StoreConfiguration.StoreType.DATABASE) { + configuration = createDefaultJDBCConfig().setJMXManagementEnabled(true); + } + else { + configuration = createDefaultInVMConfig().setJMXManagementEnabled(true); + } + mbeanServer = MBeanServerFactory.createMBeanServer(); @@ -211,15 +235,18 @@ public class BasicXaRecoveryTest extends ActiveMQTestBase { @Test public void testPagingServerRestarted() throws Exception { + if (storeType == StoreConfiguration.StoreType.DATABASE) return; verifyPaging(true); } @Test public void testPaging() throws Exception { + if (storeType == StoreConfiguration.StoreType.DATABASE) return; verifyPaging(false); } public void verifyPaging(final boolean restartServer) throws Exception { + if (storeType == StoreConfiguration.StoreType.DATABASE) return; Xid xid = new XidImpl("xa1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes()); SimpleString pageQueue = new SimpleString("pagequeue"); @@ -285,11 +312,13 @@ public class BasicXaRecoveryTest extends ActiveMQTestBase { @Test public void testRollbackPaging() throws Exception { + if (storeType == StoreConfiguration.StoreType.DATABASE) return; testRollbackPaging(false); } @Test public void testRollbackPagingServerRestarted() throws Exception { + if (storeType == StoreConfiguration.StoreType.DATABASE) return; testRollbackPaging(true); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5383a0c4/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaTest.java index 6c67a76..9b6abac 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaTest.java @@ -19,6 +19,8 @@ package org.apache.activemq.artemis.tests.integration.xa; import javax.transaction.xa.XAException; import javax.transaction.xa.XAResource; import javax.transaction.xa.Xid; +import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -34,6 +36,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.MessageHandler; import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.config.StoreConfiguration; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.transaction.impl.XidImpl; @@ -44,7 +47,10 @@ import org.apache.activemq.artemis.utils.UUIDGenerator; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +@RunWith(Parameterized.class) public class BasicXaTest extends ActiveMQTestBase { private static IntegrationTestLogger log = IntegrationTestLogger.LOGGER; @@ -63,13 +69,31 @@ public class BasicXaTest extends ActiveMQTestBase { private ServerLocator locator; + private StoreConfiguration.StoreType storeType; + + public BasicXaTest(StoreConfiguration.StoreType storeType) { + this.storeType = storeType; + } + + @Parameterized.Parameters(name = "storeType") + public static Collection data() { + Object[][] params = new Object[][] {{StoreConfiguration.StoreType.FILE}, {StoreConfiguration.StoreType.DATABASE}}; + return Arrays.asList(params); + } + @Override @Before public void setUp() throws Exception { super.setUp(); addressSettings.clear(); - configuration = createDefaultNettyConfig(); + + if (storeType == StoreConfiguration.StoreType.DATABASE) { + configuration = createDefaultJDBCConfig(); + } + else { + configuration = createDefaultNettyConfig(); + } messagingService = createServer(false, configuration, -1, -1, addressSettings); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5383a0c4/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/XaTimeoutTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/XaTimeoutTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/XaTimeoutTest.java index b6d679a..3e0ea15 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/XaTimeoutTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/XaTimeoutTest.java @@ -16,6 +16,18 @@ */ package org.apache.activemq.artemis.tests.integration.xa; +import javax.transaction.xa.XAException; +import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Interceptor; import org.apache.activemq.artemis.api.core.SimpleString; @@ -27,7 +39,8 @@ import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.MessageHandler; import org.apache.activemq.artemis.api.core.client.ServerLocator; -import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.config.StoreConfiguration; import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAStartMessage; import org.apache.activemq.artemis.core.server.ActiveMQServer; @@ -42,17 +55,10 @@ import org.apache.activemq.artemis.utils.UUIDGenerator; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; -import javax.transaction.xa.XAException; -import javax.transaction.xa.XAResource; -import javax.transaction.xa.Xid; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - +@RunWith(Parameterized.class) public class XaTimeoutTest extends ActiveMQTestBase { private final Map addressSettings = new HashMap<>(); @@ -67,19 +73,39 @@ public class XaTimeoutTest extends ActiveMQTestBase { private ClientSessionFactory sessionFactory; - private ConfigurationImpl configuration; + private Configuration configuration; private final SimpleString atestq = new SimpleString("atestq"); private ServerLocator locator; + private StoreConfiguration.StoreType storeType; + + public XaTimeoutTest(StoreConfiguration.StoreType storeType) { + this.storeType = storeType; + } + + @Parameterized.Parameters(name = "storeType") + public static Collection data() { + Object[][] params = new Object[][] {{StoreConfiguration.StoreType.FILE}, {StoreConfiguration.StoreType.DATABASE}}; + return Arrays.asList(params); + } + @Override @Before public void setUp() throws Exception { super.setUp(); addressSettings.clear(); - configuration = createBasicConfig().setTransactionTimeoutScanPeriod(500).addAcceptorConfiguration(new TransportConfiguration(ActiveMQTestBase.INVM_ACCEPTOR_FACTORY)); + + if (storeType == StoreConfiguration.StoreType.DATABASE) { + configuration = createDefaultJDBCConfig(); + } + else { + configuration = createBasicConfig(); + } + configuration.setTransactionTimeoutScanPeriod(500).addAcceptorConfiguration(new TransportConfiguration(ActiveMQTestBase.INVM_ACCEPTOR_FACTORY)); + server = addServer(ActiveMQServers.newActiveMQServer(configuration, false)); // start the server server.start();