Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 4C154200C68 for ; Wed, 19 Apr 2017 07:42:50 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 4ADC6160BAF; Wed, 19 Apr 2017 05:42:50 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id CB9E8160BA1 for ; Wed, 19 Apr 2017 07:42:48 +0200 (CEST) Received: (qmail 31652 invoked by uid 500); 19 Apr 2017 05:42:47 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 31630 invoked by uid 99); 19 Apr 2017 05:42:47 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Apr 2017 05:42:47 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 97DA1DFDAC; Wed, 19 Apr 2017 05:42:47 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: andytaylor@apache.org To: commits@activemq.apache.org Date: Wed, 19 Apr 2017 05:42:48 -0000 Message-Id: In-Reply-To: <436f82aa29104261b2285bf8550c857d@git.apache.org> References: <436f82aa29104261b2285bf8550c857d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/3] activemq-artemis git commit: ARTEMIS-1115 Call CriticalIOListener on JDBC Error archived-at: Wed, 19 Apr 2017 05:42:50 -0000 ARTEMIS-1115 Call CriticalIOListener on JDBC Error Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/7b68b0a4 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/7b68b0a4 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/7b68b0a4 Branch: refs/heads/master Commit: 7b68b0a49ae91613c87215e0840ab6c8af3db32b Parents: fc4d5ed Author: Martyn Taylor Authored: Mon Apr 17 10:40:26 2017 +0100 Committer: Clebert Suconic Committed: Wed Apr 19 00:50:58 2017 -0400 ---------------------------------------------------------------------- .../jdbc/store/file/JDBCSequentialFile.java | 80 +++++++++++++------- .../store/file/JDBCSequentialFileFactory.java | 70 +++++++++++++---- .../file/JDBCSequentialFileFactoryDriver.java | 8 +- .../jdbc/store/journal/JDBCJournalImpl.java | 75 +++++++++--------- .../jdbc/store/journal/JDBCJournalRecord.java | 6 +- .../file/JDBCSequentialFileFactoryTest.java | 7 +- .../paging/impl/PagingStoreFactoryDatabase.java | 7 +- .../impl/journal/JDBCJournalStorageManager.java | 12 +-- .../jdbc/store/journal/JDBCJournalTest.java | 9 ++- 9 files changed, 187 insertions(+), 87 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b68b0a4/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java ---------------------------------------------------------------------- diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java index 7e72785..e2da151 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java @@ -93,18 +93,23 @@ public class JDBCSequentialFile implements SequentialFile { return fileFactory.listFiles(extension).contains(filename); } catch (Exception e) { logger.warn(e.getMessage(), e); + fileFactory.onIOError(e, "Error checking JDBC file exists.", this); return false; } } @Override public synchronized void open() throws Exception { - if (!isOpen) { - synchronized (writeLock) { - dbDriver.openFile(this); - isCreated = true; - isOpen = true; + try { + if (!isOpen) { + synchronized (writeLock) { + dbDriver.openFile(this); + isCreated = true; + isOpen = true; + } } + } catch (SQLException e) { + fileFactory.onIOError(e, "Error attempting to open JDBC file.", this); } } @@ -142,34 +147,35 @@ public class JDBCSequentialFile implements SequentialFile { } } } catch (SQLException e) { - throw new ActiveMQException(ActiveMQExceptionType.IO_ERROR, e.getMessage(), e); + fileFactory.onIOError(e, "Error deleting JDBC file.", this); } } - private synchronized int internalWrite(byte[] data, IOCallback callback) { + private synchronized int internalWrite(byte[] data, IOCallback callback) throws Exception { try { synchronized (writeLock) { int noBytes = dbDriver.writeToFile(this, data); seek(noBytes); + System.out.println("Write: ID: " + this.getId() + " FileName: " + this.getFileName() + size()); if (callback != null) callback.done(); return noBytes; } } catch (Exception e) { - logger.warn("Failed to write to file", e.getMessage(), e); if (callback != null) - callback.onError(-1, e.getMessage()); + callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getMessage()); + fileFactory.onIOError(e, "Error writing to JDBC file.", this); } - return -1; + return 0; } - public synchronized int internalWrite(ActiveMQBuffer buffer, IOCallback callback) { + public synchronized int internalWrite(ActiveMQBuffer buffer, IOCallback callback) throws Exception { byte[] data = new byte[buffer.readableBytes()]; buffer.readBytes(data); return internalWrite(data, callback); } - private synchronized int internalWrite(ByteBuffer buffer, IOCallback callback) { + private synchronized int internalWrite(ByteBuffer buffer, IOCallback callback) throws Exception { return internalWrite(buffer.array(), callback); } @@ -177,16 +183,27 @@ public class JDBCSequentialFile implements SequentialFile { executor.execute(new Runnable() { @Override public void run() { - internalWrite(bytes, callback); + try { + internalWrite(bytes, callback); + } catch (Exception e) { + logger.error(e); + // internalWrite will notify the CriticalIOErrorListener + } } }); } private void scheduleWrite(final ByteBuffer bytes, final IOCallback callback) { + final SequentialFile file = this; executor.execute(new Runnable() { @Override public void run() { - internalWrite(bytes, callback); + try { + internalWrite(bytes, callback); + } catch (Exception e) { + logger.error(e); + fileFactory.onIOError(e, "Error on JDBC file sync", file); + } } }); } @@ -226,7 +243,8 @@ public class JDBCSequentialFile implements SequentialFile { scheduleWrite(bytes, waitIOCallback); waitIOCallback.waitCompletion(); } catch (Exception e) { - waitIOCallback.onError(-1, e.getMessage()); + waitIOCallback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), "Error writing to JDBC file."); + fileFactory.onIOError(e, "Failed to write to file.", this); } } else { scheduleWrite(bytes, callback); @@ -249,12 +267,12 @@ public class JDBCSequentialFile implements SequentialFile { if (callback != null) callback.done(); return read; - } catch (Exception e) { + } catch (SQLException e) { if (callback != null) - callback.onError(-1, e.getMessage()); - logger.warn("Failed to read from file", e.getMessage(), e); - return 0; + callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getMessage()); + fileFactory.onIOError(e, "Error reading from JDBC file.", this); } + return 0; } } @@ -291,7 +309,8 @@ public class JDBCSequentialFile implements SequentialFile { try { callback.waitCompletion(); } catch (Exception e) { - throw new IOException(e); + callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), "Error during JDBC file sync."); + fileFactory.onIOError(e, "Error during JDBC file sync.", this); } } @@ -303,7 +322,11 @@ public class JDBCSequentialFile implements SequentialFile { @Override public void renameTo(String newFileName) throws Exception { synchronized (writeLock) { - dbDriver.renameFile(this, newFileName); + try { + dbDriver.renameFile(this, newFileName); + } catch (SQLException e) { + fileFactory.onIOError(e, "Error renaming JDBC file.", this); + } } } @@ -313,18 +336,21 @@ public class JDBCSequentialFile implements SequentialFile { JDBCSequentialFile clone = new JDBCSequentialFile(fileFactory, filename, executor, dbDriver, writeLock); return clone; } catch (Exception e) { - logger.error("Error cloning file: " + filename, e); - return null; + fileFactory.onIOError(e, "Error cloning JDBC file.", this); } + return null; } @Override public void copyTo(SequentialFile cloneFile) throws Exception { JDBCSequentialFile clone = (JDBCSequentialFile) cloneFile; - clone.open(); - - synchronized (writeLock) { - dbDriver.copyFileData(this, clone); + try { + synchronized (writeLock) { + clone.open(); + dbDriver.copyFileData(this, clone); + } + } catch (Exception e) { + fileFactory.onIOError(e, "Error copying JDBC file.", this); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b68b0a4/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java ---------------------------------------------------------------------- diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java index fa88a85..d5a92a2 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java @@ -27,15 +27,19 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Executor; +import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; +import org.jboss.logging.Logger; public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveMQComponent { + private static final Logger logger = Logger.getLogger(JDBCSequentialFile.class); + private boolean started; private final List files = new ArrayList<>(); @@ -44,28 +48,53 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM private final Map fileLocks = new HashMap<>(); - private final JDBCSequentialFileFactoryDriver dbDriver; + private JDBCSequentialFileFactoryDriver dbDriver; + + private final IOCriticalErrorListener criticalErrorListener; public JDBCSequentialFileFactory(final DataSource dataSource, final SQLProvider sqlProvider, - Executor executor) throws Exception { + Executor executor, + IOCriticalErrorListener criticalErrorListener) throws Exception { + this.executor = executor; - dbDriver = JDBCFileUtils.getDBFileDriver(dataSource, sqlProvider); + this.criticalErrorListener = criticalErrorListener; + + try { + this.dbDriver = JDBCFileUtils.getDBFileDriver(dataSource, sqlProvider); + } catch (SQLException e) { + criticalErrorListener.onIOException(e, "Failed to start JDBC Driver", null); + } + } public JDBCSequentialFileFactory(final String connectionUrl, final String className, final SQLProvider sqlProvider, - Executor executor) throws Exception { + Executor executor, + IOCriticalErrorListener criticalErrorListener) throws Exception { this.executor = executor; - dbDriver = JDBCFileUtils.getDBFileDriver(className, connectionUrl, sqlProvider); + this.criticalErrorListener = criticalErrorListener; + try { + this.dbDriver = JDBCFileUtils.getDBFileDriver(className, connectionUrl, sqlProvider); + } catch (SQLException e) { + criticalErrorListener.onIOException(e, "Failed to start JDBC Driver", null); + } + } public JDBCSequentialFileFactory(final Connection connection, final SQLProvider sqlProvider, - final Executor executor) throws Exception { + final Executor executor, + final IOCriticalErrorListener criticalErrorListener) throws Exception { this.executor = executor; - this.dbDriver = JDBCFileUtils.getDBFileDriver(connection, sqlProvider); + this.criticalErrorListener = criticalErrorListener; + + try { + this.dbDriver = JDBCFileUtils.getDBFileDriver(connection, sqlProvider); + } catch (SQLException e) { + criticalErrorListener.onIOException(e, "Failed to start JDBC Driver", null); + } } public JDBCSequentialFileFactoryDriver getDbDriver() { @@ -74,8 +103,6 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM @Override public SequentialFileFactory setDatasync(boolean enabled) { - - // noop return this; } @@ -92,7 +119,7 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM started = true; } } catch (Exception e) { - ActiveMQJournalLogger.LOGGER.error("Could not start file factory, unable to connect to database", e); + criticalErrorListener.onIOException(e, "Unable to start database driver", null); started = false; } } @@ -115,7 +142,7 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM files.add(file); return file; } catch (Exception e) { - ActiveMQJournalLogger.LOGGER.error("Could not create file", e); + criticalErrorListener.onIOException(e, "Error whilst creating JDBC file", null); } return null; } @@ -127,7 +154,12 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM @Override public List listFiles(String extension) throws Exception { - return dbDriver.listFiles(extension); + try { + return dbDriver.listFiles(extension); + } catch (SQLException e) { + criticalErrorListener.onIOException(e, "Error listing JDBC files.", null); + throw e; + } } @Override @@ -137,6 +169,7 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM @Override public void onIOError(Exception exception, String message, SequentialFile file) { + criticalErrorListener.onIOException(exception, message, file); } @Override @@ -215,9 +248,20 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM @Override public void flush() { + for (SequentialFile file : files) { + try { + file.sync(); + } catch (Exception e) { + criticalErrorListener.onIOException(e, "Error during JDBC file sync.", file); + } + } } public synchronized void destroy() throws SQLException { - dbDriver.destroy(); + try { + dbDriver.destroy(); + } catch (SQLException e) { + logger.error("Error destroying file factory", e); + } } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b68b0a4/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java ---------------------------------------------------------------------- diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java index cf8d39d..a901f6a 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java @@ -29,10 +29,13 @@ import java.util.List; import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver; import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; +import org.jboss.logging.Logger; @SuppressWarnings("SynchronizeOnNonFinalField") public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver { + private static final Logger logger = Logger.getLogger(JDBCSequentialFileFactoryDriver.class); + protected PreparedStatement deleteFile; protected PreparedStatement createFile; @@ -157,6 +160,8 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver { Blob blob = rs.getBlob(1); if (blob != null) { file.setWritePosition((int) blob.length()); + } else { + logger.warn("ERROR NO BLOB FOR FILE" + "File: " + file.getFileName() + " " + file.getId()); } } connection.commit(); @@ -293,8 +298,9 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver { connection.commit(); return readLength; } catch (Throwable e) { - connection.rollback(); throw e; + } finally { + connection.rollback(); } } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b68b0a4/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java ---------------------------------------------------------------------- diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java index 4df6a17..0e67dbc 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java @@ -30,6 +30,8 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; +import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.journal.EncoderPersister; import org.apache.activemq.artemis.core.journal.EncodingSupport; @@ -84,26 +86,32 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { // Sequence ID for journal records private final AtomicLong seq = new AtomicLong(0); + private final IOCriticalErrorListener criticalIOErrorListener; + public JDBCJournalImpl(DataSource dataSource, SQLProvider provider, String tableName, ScheduledExecutorService scheduledExecutorService, - Executor completeExecutor) { + Executor completeExecutor, + IOCriticalErrorListener criticalIOErrorListener) { super(dataSource, provider); records = new ArrayList<>(); this.scheduledExecutorService = scheduledExecutorService; this.completeExecutor = completeExecutor; + this.criticalIOErrorListener = criticalIOErrorListener; } public JDBCJournalImpl(String jdbcUrl, String jdbcDriverClass, SQLProvider sqlProvider, ScheduledExecutorService scheduledExecutorService, - Executor completeExecutor) { + Executor completeExecutor, + IOCriticalErrorListener criticalIOErrorListener) { super(sqlProvider, jdbcUrl, jdbcDriverClass); records = new ArrayList<>(); this.scheduledExecutorService = scheduledExecutorService; this.completeExecutor = completeExecutor; + this.criticalIOErrorListener = criticalIOErrorListener; } @Override @@ -133,9 +141,14 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { } @Override - public synchronized void stop() throws SQLException { + public void stop() throws SQLException { + stop(true); + } + + public synchronized void stop(boolean sync) throws SQLException { if (started) { - sync(); + if (sync) + sync(); started = false; super.stop(); } @@ -166,8 +179,9 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { TransactionHolder holder; - boolean success = false; try { + connection.setAutoCommit(false); + for (JDBCJournalRecord record : recordRef) { switch (record.getRecordType()) { case JDBCJournalRecord.DELETE_RECORD: @@ -197,36 +211,25 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { break; } } - } catch (SQLException e) { - logger.warn(e.getMessage(), e); - executeCallbacks(recordRef, success); - return 0; - } - - try { - connection.setAutoCommit(false); insertJournalRecords.executeBatch(); deleteJournalRecords.executeBatch(); deleteJournalTxRecords.executeBatch(); connection.commit(); - success = true; - } catch (SQLException e) { - logger.warn(e.getMessage(), e); - performRollback(recordRef); - } - try { - if (success) - cleanupTxRecords(deletedRecords, committedTransactions); - } catch (SQLException e) { - logger.warn("Failed to remove the Tx Records", e.getMessage(), e); - } finally { - executeCallbacks(recordRef, success); - } + cleanupTxRecords(deletedRecords, committedTransactions); + executeCallbacks(recordRef, true); + + return recordRef.size(); - return recordRef.size(); + } catch (Exception e) { + criticalIOErrorListener.onIOException(e, "Critical IO Error. Failed to process JDBC Record statements", null); + started = false; + executeCallbacks(recordRef, false); + performRollback(recordRef); + return 0; + } } /* We store Transaction reference in memory (once all records associated with a Tranascation are Deleted, @@ -262,8 +265,6 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { private void performRollback(List records) { try { - connection.rollback(); - for (JDBCJournalRecord record : records) { if (record.isTransactional() || record.getRecordType() == JDBCJournalRecord.PREPARE_RECORD) { removeTxRecord(record); @@ -279,13 +280,14 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { transactions.remove(txH.transactionID); } } + connection.rollback(); } catch (Exception sqlE) { - logger.warn(sqlE.getMessage(), sqlE); + logger.error(sqlE.getMessage(), sqlE); + criticalIOErrorListener.onIOException(sqlE, sqlE.getMessage(), null); ActiveMQJournalLogger.LOGGER.error("Error performing rollback", sqlE); } } - // TODO Use an executor. private void executeCallbacks(final List records, final boolean result) { Runnable r = new Runnable() { @Override @@ -299,7 +301,13 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { } private void appendRecord(JDBCJournalRecord record) throws Exception { + record.storeLineUp(); + if (!started) { + if (record.getIoCompletion() != null) { + record.getIoCompletion().onError(ActiveMQExceptionType.IO_ERROR.getCode(), "JDBC Journal not started"); + } + } SimpleWaitIOCallback callback = null; if (record.isSync() && record.getIoCompletion() == null) { @@ -318,10 +326,7 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { } syncTimer.delay(); - - if (callback != null) { - callback.waitCompletion(); - } + if (callback != null) callback.waitCompletion(); } private synchronized void addTxRecord(JDBCJournalRecord record) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b68b0a4/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java ---------------------------------------------------------------------- diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java index b094164..a33888d 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java @@ -26,6 +26,7 @@ import java.sql.SQLException; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; +import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.journal.IOCompletion; import org.apache.activemq.artemis.core.persistence.Persister; @@ -116,7 +117,7 @@ class JDBCJournalRecord { if (success) { ioCompletion.done(); } else { - ioCompletion.onError(1, "DATABASE TRANSACTION FAILED"); + ioCompletion.onError(ActiveMQExceptionType.IO_ERROR.getCode(), "JDBC Transaction failed."); } } } @@ -127,7 +128,7 @@ class JDBCJournalRecord { } } - void writeRecord(PreparedStatement statement) throws SQLException { + void writeRecord(PreparedStatement statement) throws Exception { byte[] recordBytes = new byte[variableSize]; byte[] txDataBytes = new byte[txDataSize]; @@ -137,6 +138,7 @@ class JDBCJournalRecord { txData.read(txDataBytes); } catch (IOException e) { ActiveMQJournalLogger.LOGGER.error("Error occurred whilst reading Journal Record", e); + throw e; } statement.setLong(1, id); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b68b0a4/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java ---------------------------------------------------------------------- diff --git a/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java b/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java index b7d0c9d..b04b74f 100644 --- a/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java +++ b/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java @@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils; import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFile; @@ -64,7 +65,11 @@ public class JDBCSequentialFileFactoryTest { String connectionUrl = "jdbc:derby:target/data;create=true"; String tableName = "FILES"; - factory = new JDBCSequentialFileFactory(connectionUrl, className, JDBCUtils.getSQLProvider(className, tableName, SQLProvider.DatabaseStoreType.PAGE), executor); + factory = new JDBCSequentialFileFactory(connectionUrl, className, JDBCUtils.getSQLProvider(className, tableName, SQLProvider.DatabaseStoreType.PAGE), executor, new IOCriticalErrorListener() { + @Override + public void onIOException(Throwable code, String message, SequentialFile file) { + } + }); factory.start(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b68b0a4/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java index 3177b6e..112cc46f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java @@ -91,6 +91,8 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory { private boolean started = false; + private final IOCriticalErrorListener criticalErrorListener; + public PagingStoreFactoryDatabase(final DatabaseStorageConfiguration dbConf, final StorageManager storageManager, final long syncTimeout, @@ -104,6 +106,7 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory { this.scheduledExecutor = scheduledExecutor; this.syncTimeout = syncTimeout; this.dbConf = dbConf; + this.criticalErrorListener = critialErrorListener; start(); } @@ -119,9 +122,11 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory { if (sqlProviderFactory == null) { sqlProviderFactory = new GenericSQLProvider.Factory(); } + pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getPageStoreTableName(), SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor(), criticalErrorListener); pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(pageStoreTableNamePrefix, SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor()); } else { String driverClassName = dbConf.getJdbcDriverClassName(); + pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getPageStoreTableName(), SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor(), criticalErrorListener); pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, pageStoreTableNamePrefix, SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor()); } pagingFactoryFileFactory.start(); @@ -232,7 +237,7 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory { sqlProvider = JDBCUtils.getSQLProvider(dbConf.getJdbcDriverClassName(), getTableNameForGUID(directoryName), SQLProvider.DatabaseStoreType.PAGE); } - return new JDBCSequentialFileFactory(pagingFactoryFileFactory.getDbDriver().getConnection(), sqlProvider, executorFactory.getExecutor()); + return new JDBCSequentialFileFactory(pagingFactoryFileFactory.getDbDriver().getConnection(), sqlProvider, executorFactory.getExecutor(), criticalErrorListener); } private String getTableNameForGUID(String guid) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b68b0a4/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java index 8634638..5592c9e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java @@ -62,14 +62,14 @@ public class JDBCJournalStorageManager extends JournalStorageManager { if (sqlProviderFactory == null) { sqlProviderFactory = new GenericSQLProvider.Factory(); } - bindingsJournal = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL), dbConf.getBindingsTableName(), scheduledExecutorService, executorFactory.getExecutor()); - messageJournal = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getMessageTableName(), SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), dbConf.getMessageTableName(), scheduledExecutorService, executorFactory.getExecutor()); - largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getLargeMessageTableName(), SQLProvider.DatabaseStoreType.LARGE_MESSAGE), executor); + bindingsJournal = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL), dbConf.getBindingsTableName(), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener); + messageJournal = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getMessageTableName(), SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), dbConf.getMessageTableName(), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener); + largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getLargeMessageTableName(), SQLProvider.DatabaseStoreType.LARGE_MESSAGE), executor, criticalErrorListener); } else { String driverClassName = dbConf.getJdbcDriverClassName(); - bindingsJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL), scheduledExecutorService, executorFactory.getExecutor()); - messageJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getMessageTableName(), SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), scheduledExecutorService, executorFactory.getExecutor()); - largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getLargeMessageTableName(), SQLProvider.DatabaseStoreType.LARGE_MESSAGE), executor); + bindingsJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener); + messageJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getMessageTableName(), SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener); + largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getLargeMessageTableName(), SQLProvider.DatabaseStoreType.LARGE_MESSAGE), executor, criticalErrorListener); } largeMessagesFactory.start(); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b68b0a4/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java index ebb5c0e..8f1d25a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java @@ -26,6 +26,8 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; +import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.journal.IOCompletion; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.RecordInfo; @@ -77,7 +79,12 @@ public class JDBCJournalTest extends ActiveMQTestBase { executorService = Executors.newSingleThreadExecutor(); jdbcUrl = "jdbc:derby:target/data;create=true"; SQLProvider.Factory factory = new DerbySQLProvider.Factory(); - journal = new JDBCJournalImpl(jdbcUrl, DRIVER_CLASS, factory.create(JOURNAL_TABLE_NAME, SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), scheduledExecutorService, executorService); + journal = new JDBCJournalImpl(jdbcUrl, DRIVER_CLASS, factory.create(JOURNAL_TABLE_NAME, SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), scheduledExecutorService, executorService, new IOCriticalErrorListener() { + @Override + public void onIOException(Throwable code, String message, SequentialFile file) { + + } + }); journal.start(); }