activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andytay...@apache.org
Subject [2/3] activemq-artemis git commit: ARTEMIS-1115 Call CriticalIOListener on JDBC Error
Date Wed, 19 Apr 2017 05:42:48 GMT
ARTEMIS-1115 Call CriticalIOListener on JDBC Error


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/7b68b0a4
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/7b68b0a4
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/7b68b0a4

Branch: refs/heads/master
Commit: 7b68b0a49ae91613c87215e0840ab6c8af3db32b
Parents: fc4d5ed
Author: Martyn Taylor <mtaylor@redhat.com>
Authored: Mon Apr 17 10:40:26 2017 +0100
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Wed Apr 19 00:50:58 2017 -0400

----------------------------------------------------------------------
 .../jdbc/store/file/JDBCSequentialFile.java     | 80 +++++++++++++-------
 .../store/file/JDBCSequentialFileFactory.java   | 70 +++++++++++++----
 .../file/JDBCSequentialFileFactoryDriver.java   |  8 +-
 .../jdbc/store/journal/JDBCJournalImpl.java     | 75 +++++++++---------
 .../jdbc/store/journal/JDBCJournalRecord.java   |  6 +-
 .../file/JDBCSequentialFileFactoryTest.java     |  7 +-
 .../paging/impl/PagingStoreFactoryDatabase.java |  7 +-
 .../impl/journal/JDBCJournalStorageManager.java | 12 +--
 .../jdbc/store/journal/JDBCJournalTest.java     |  9 ++-
 9 files changed, 187 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b68b0a4/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
index 7e72785..e2da151 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
@@ -93,18 +93,23 @@ public class JDBCSequentialFile implements SequentialFile {
          return fileFactory.listFiles(extension).contains(filename);
       } catch (Exception e) {
          logger.warn(e.getMessage(), e);
+         fileFactory.onIOError(e, "Error checking JDBC file exists.", this);
          return false;
       }
    }
 
    @Override
    public synchronized void open() throws Exception {
-      if (!isOpen) {
-         synchronized (writeLock) {
-            dbDriver.openFile(this);
-            isCreated = true;
-            isOpen = true;
+      try {
+         if (!isOpen) {
+            synchronized (writeLock) {
+               dbDriver.openFile(this);
+               isCreated = true;
+               isOpen = true;
+            }
          }
+      } catch (SQLException e) {
+         fileFactory.onIOError(e, "Error attempting to open JDBC file.", this);
       }
    }
 
@@ -142,34 +147,35 @@ public class JDBCSequentialFile implements SequentialFile {
             }
          }
       } catch (SQLException e) {
-         throw new ActiveMQException(ActiveMQExceptionType.IO_ERROR, e.getMessage(), e);
+         fileFactory.onIOError(e, "Error deleting JDBC file.", this);
       }
    }
 
-   private synchronized int internalWrite(byte[] data, IOCallback callback) {
+   private synchronized int internalWrite(byte[] data, IOCallback callback) throws Exception
{
       try {
          synchronized (writeLock) {
             int noBytes = dbDriver.writeToFile(this, data);
             seek(noBytes);
+            System.out.println("Write: ID: " + this.getId() + " FileName: " + this.getFileName()
+ size());
             if (callback != null)
                callback.done();
             return noBytes;
          }
       } catch (Exception e) {
-         logger.warn("Failed to write to file", e.getMessage(), e);
          if (callback != null)
-            callback.onError(-1, e.getMessage());
+            callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getMessage());
+         fileFactory.onIOError(e, "Error writing to JDBC file.", this);
       }
-      return -1;
+      return 0;
    }
 
-   public synchronized int internalWrite(ActiveMQBuffer buffer, IOCallback callback) {
+   public synchronized int internalWrite(ActiveMQBuffer buffer, IOCallback callback) throws
Exception {
       byte[] data = new byte[buffer.readableBytes()];
       buffer.readBytes(data);
       return internalWrite(data, callback);
    }
 
-   private synchronized int internalWrite(ByteBuffer buffer, IOCallback callback) {
+   private synchronized int internalWrite(ByteBuffer buffer, IOCallback callback) throws
Exception {
       return internalWrite(buffer.array(), callback);
    }
 
@@ -177,16 +183,27 @@ public class JDBCSequentialFile implements SequentialFile {
       executor.execute(new Runnable() {
          @Override
          public void run() {
-            internalWrite(bytes, callback);
+            try {
+               internalWrite(bytes, callback);
+            } catch (Exception e) {
+               logger.error(e);
+               // internalWrite will notify the CriticalIOErrorListener
+            }
          }
       });
    }
 
    private void scheduleWrite(final ByteBuffer bytes, final IOCallback callback) {
+      final SequentialFile file = this;
       executor.execute(new Runnable() {
          @Override
          public void run() {
-            internalWrite(bytes, callback);
+            try {
+               internalWrite(bytes, callback);
+            } catch (Exception e) {
+               logger.error(e);
+               fileFactory.onIOError(e, "Error on JDBC file sync", file);
+            }
          }
       });
    }
@@ -226,7 +243,8 @@ public class JDBCSequentialFile implements SequentialFile {
             scheduleWrite(bytes, waitIOCallback);
             waitIOCallback.waitCompletion();
          } catch (Exception e) {
-            waitIOCallback.onError(-1, e.getMessage());
+            waitIOCallback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), "Error writing
to JDBC file.");
+            fileFactory.onIOError(e, "Failed to write to file.", this);
          }
       } else {
          scheduleWrite(bytes, callback);
@@ -249,12 +267,12 @@ public class JDBCSequentialFile implements SequentialFile {
             if (callback != null)
                callback.done();
             return read;
-         } catch (Exception e) {
+         } catch (SQLException e) {
             if (callback != null)
-               callback.onError(-1, e.getMessage());
-            logger.warn("Failed to read from file", e.getMessage(), e);
-            return 0;
+               callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getMessage());
+            fileFactory.onIOError(e, "Error reading from JDBC file.", this);
          }
+         return 0;
       }
    }
 
@@ -291,7 +309,8 @@ public class JDBCSequentialFile implements SequentialFile {
       try {
          callback.waitCompletion();
       } catch (Exception e) {
-         throw new IOException(e);
+         callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), "Error during JDBC file
sync.");
+         fileFactory.onIOError(e, "Error during JDBC file sync.", this);
       }
    }
 
@@ -303,7 +322,11 @@ public class JDBCSequentialFile implements SequentialFile {
    @Override
    public void renameTo(String newFileName) throws Exception {
       synchronized (writeLock) {
-         dbDriver.renameFile(this, newFileName);
+         try {
+            dbDriver.renameFile(this, newFileName);
+         } catch (SQLException e) {
+            fileFactory.onIOError(e, "Error renaming JDBC file.", this);
+         }
       }
    }
 
@@ -313,18 +336,21 @@ public class JDBCSequentialFile implements SequentialFile {
          JDBCSequentialFile clone = new JDBCSequentialFile(fileFactory, filename, executor,
dbDriver, writeLock);
          return clone;
       } catch (Exception e) {
-         logger.error("Error cloning file: " + filename, e);
-         return null;
+         fileFactory.onIOError(e, "Error cloning JDBC file.", this);
       }
+      return null;
    }
 
    @Override
    public void copyTo(SequentialFile cloneFile) throws Exception {
       JDBCSequentialFile clone = (JDBCSequentialFile) cloneFile;
-      clone.open();
-
-      synchronized (writeLock) {
-         dbDriver.copyFileData(this, clone);
+      try {
+         synchronized (writeLock) {
+            clone.open();
+            dbDriver.copyFileData(this, clone);
+         }
+      } catch (Exception e) {
+         fileFactory.onIOError(e, "Error copying JDBC file.", this);
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b68b0a4/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
index fa88a85..d5a92a2 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
@@ -27,15 +27,19 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executor;
 
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
 import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.io.SequentialFileFactory;
 import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
 import org.apache.activemq.artemis.core.server.ActiveMQComponent;
 import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
 import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
+import org.jboss.logging.Logger;
 
 public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveMQComponent
{
 
+   private static final Logger logger = Logger.getLogger(JDBCSequentialFile.class);
+
    private boolean started;
 
    private final List<JDBCSequentialFile> files = new ArrayList<>();
@@ -44,28 +48,53 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory,
ActiveM
 
    private final Map<String, Object> fileLocks = new HashMap<>();
 
-   private final JDBCSequentialFileFactoryDriver dbDriver;
+   private JDBCSequentialFileFactoryDriver dbDriver;
+
+   private final IOCriticalErrorListener criticalErrorListener;
 
    public JDBCSequentialFileFactory(final DataSource dataSource,
                                     final SQLProvider sqlProvider,
-                                    Executor executor) throws Exception {
+                                    Executor executor,
+                                    IOCriticalErrorListener criticalErrorListener) throws
Exception {
+
       this.executor = executor;
-      dbDriver = JDBCFileUtils.getDBFileDriver(dataSource, sqlProvider);
+      this.criticalErrorListener = criticalErrorListener;
+
+      try {
+         this.dbDriver = JDBCFileUtils.getDBFileDriver(dataSource, sqlProvider);
+      } catch (SQLException e) {
+         criticalErrorListener.onIOException(e, "Failed to start JDBC Driver", null);
+      }
+
    }
 
    public JDBCSequentialFileFactory(final String connectionUrl,
                                     final String className,
                                     final SQLProvider sqlProvider,
-                                    Executor executor) throws Exception {
+                                    Executor executor,
+                                    IOCriticalErrorListener criticalErrorListener) throws
Exception {
       this.executor = executor;
-      dbDriver = JDBCFileUtils.getDBFileDriver(className, connectionUrl, sqlProvider);
+      this.criticalErrorListener = criticalErrorListener;
+      try {
+         this.dbDriver = JDBCFileUtils.getDBFileDriver(className, connectionUrl, sqlProvider);
+      } catch (SQLException e) {
+         criticalErrorListener.onIOException(e, "Failed to start JDBC Driver", null);
+      }
+
    }
 
    public JDBCSequentialFileFactory(final Connection connection,
                                     final SQLProvider sqlProvider,
-                                    final Executor executor) throws Exception {
+                                    final Executor executor,
+                                    final IOCriticalErrorListener criticalErrorListener)
throws Exception {
       this.executor = executor;
-      this.dbDriver = JDBCFileUtils.getDBFileDriver(connection, sqlProvider);
+      this.criticalErrorListener = criticalErrorListener;
+
+      try {
+         this.dbDriver = JDBCFileUtils.getDBFileDriver(connection, sqlProvider);
+      } catch (SQLException e) {
+         criticalErrorListener.onIOException(e, "Failed to start JDBC Driver", null);
+      }
    }
 
    public JDBCSequentialFileFactoryDriver getDbDriver() {
@@ -74,8 +103,6 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory,
ActiveM
 
    @Override
    public SequentialFileFactory setDatasync(boolean enabled) {
-
-      // noop
       return this;
    }
 
@@ -92,7 +119,7 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory,
ActiveM
             started = true;
          }
       } catch (Exception e) {
-         ActiveMQJournalLogger.LOGGER.error("Could not start file factory, unable to connect
to database", e);
+         criticalErrorListener.onIOException(e, "Unable to start database driver", null);
          started = false;
       }
    }
@@ -115,7 +142,7 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory,
ActiveM
          files.add(file);
          return file;
       } catch (Exception e) {
-         ActiveMQJournalLogger.LOGGER.error("Could not create file", e);
+         criticalErrorListener.onIOException(e, "Error whilst creating JDBC file", null);
       }
       return null;
    }
@@ -127,7 +154,12 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory,
ActiveM
 
    @Override
    public List<String> listFiles(String extension) throws Exception {
-      return dbDriver.listFiles(extension);
+      try {
+         return dbDriver.listFiles(extension);
+      } catch (SQLException e) {
+         criticalErrorListener.onIOException(e, "Error listing JDBC files.", null);
+         throw e;
+      }
    }
 
    @Override
@@ -137,6 +169,7 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory,
ActiveM
 
    @Override
    public void onIOError(Exception exception, String message, SequentialFile file) {
+      criticalErrorListener.onIOException(exception, message, file);
    }
 
    @Override
@@ -215,9 +248,20 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory,
ActiveM
 
    @Override
    public void flush() {
+      for (SequentialFile file : files) {
+         try {
+            file.sync();
+         } catch (Exception e) {
+            criticalErrorListener.onIOException(e, "Error during JDBC file sync.", file);
+         }
+      }
    }
 
    public synchronized void destroy() throws SQLException {
-      dbDriver.destroy();
+      try {
+         dbDriver.destroy();
+      } catch (SQLException e) {
+         logger.error("Error destroying file factory", e);
+      }
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b68b0a4/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java
index cf8d39d..a901f6a 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java
@@ -29,10 +29,13 @@ import java.util.List;
 
 import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver;
 import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
+import org.jboss.logging.Logger;
 
 @SuppressWarnings("SynchronizeOnNonFinalField")
 public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
 
+   private static final Logger logger = Logger.getLogger(JDBCSequentialFileFactoryDriver.class);
+
    protected PreparedStatement deleteFile;
 
    protected PreparedStatement createFile;
@@ -157,6 +160,8 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver
{
                Blob blob = rs.getBlob(1);
                if (blob != null) {
                   file.setWritePosition((int) blob.length());
+               } else {
+                  logger.warn("ERROR NO BLOB FOR FILE" + "File: " + file.getFileName() +
" " + file.getId());
                }
             }
             connection.commit();
@@ -293,8 +298,9 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver
{
             connection.commit();
             return readLength;
          } catch (Throwable e) {
-            connection.rollback();
             throw e;
+         } finally {
+            connection.rollback();
          }
       }
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b68b0a4/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
index 4df6a17..0e67dbc 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
@@ -30,6 +30,8 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
 import org.apache.activemq.artemis.core.io.SequentialFileFactory;
 import org.apache.activemq.artemis.core.journal.EncoderPersister;
 import org.apache.activemq.artemis.core.journal.EncodingSupport;
@@ -84,26 +86,32 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal
{
    // Sequence ID for journal records
    private final AtomicLong seq = new AtomicLong(0);
 
+   private final IOCriticalErrorListener criticalIOErrorListener;
+
    public JDBCJournalImpl(DataSource dataSource,
                           SQLProvider provider,
                           String tableName,
                           ScheduledExecutorService scheduledExecutorService,
-                          Executor completeExecutor) {
+                          Executor completeExecutor,
+                          IOCriticalErrorListener criticalIOErrorListener) {
       super(dataSource, provider);
       records = new ArrayList<>();
       this.scheduledExecutorService = scheduledExecutorService;
       this.completeExecutor = completeExecutor;
+      this.criticalIOErrorListener = criticalIOErrorListener;
    }
 
    public JDBCJournalImpl(String jdbcUrl,
                           String jdbcDriverClass,
                           SQLProvider sqlProvider,
                           ScheduledExecutorService scheduledExecutorService,
-                          Executor completeExecutor) {
+                          Executor completeExecutor,
+                          IOCriticalErrorListener criticalIOErrorListener) {
       super(sqlProvider, jdbcUrl, jdbcDriverClass);
       records = new ArrayList<>();
       this.scheduledExecutorService = scheduledExecutorService;
       this.completeExecutor = completeExecutor;
+      this.criticalIOErrorListener = criticalIOErrorListener;
    }
 
    @Override
@@ -133,9 +141,14 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal
{
    }
 
    @Override
-   public synchronized void stop() throws SQLException {
+   public void stop() throws SQLException {
+      stop(true);
+   }
+
+   public synchronized void stop(boolean sync) throws SQLException {
       if (started) {
-         sync();
+         if (sync)
+            sync();
          started = false;
          super.stop();
       }
@@ -166,8 +179,9 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal
{
 
       TransactionHolder holder;
 
-      boolean success = false;
       try {
+         connection.setAutoCommit(false);
+
          for (JDBCJournalRecord record : recordRef) {
             switch (record.getRecordType()) {
                case JDBCJournalRecord.DELETE_RECORD:
@@ -197,36 +211,25 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal
{
                   break;
             }
          }
-      } catch (SQLException e) {
-         logger.warn(e.getMessage(), e);
-         executeCallbacks(recordRef, success);
-         return 0;
-      }
-
-      try {
-         connection.setAutoCommit(false);
 
          insertJournalRecords.executeBatch();
          deleteJournalRecords.executeBatch();
          deleteJournalTxRecords.executeBatch();
 
          connection.commit();
-         success = true;
-      } catch (SQLException e) {
-         logger.warn(e.getMessage(), e);
-         performRollback(recordRef);
-      }
 
-      try {
-         if (success)
-            cleanupTxRecords(deletedRecords, committedTransactions);
-      } catch (SQLException e) {
-         logger.warn("Failed to remove the Tx Records", e.getMessage(), e);
-      } finally {
-         executeCallbacks(recordRef, success);
-      }
+         cleanupTxRecords(deletedRecords, committedTransactions);
+         executeCallbacks(recordRef, true);
+
+         return recordRef.size();
 
-      return recordRef.size();
+      } catch (Exception e) {
+         criticalIOErrorListener.onIOException(e, "Critical IO Error.  Failed to process
JDBC Record statements", null);
+         started = false;
+         executeCallbacks(recordRef, false);
+         performRollback(recordRef);
+         return 0;
+      }
    }
 
    /* We store Transaction reference in memory (once all records associated with a Tranascation
are Deleted,
@@ -262,8 +265,6 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal
{
 
    private void performRollback(List<JDBCJournalRecord> records) {
       try {
-         connection.rollback();
-
          for (JDBCJournalRecord record : records) {
             if (record.isTransactional() || record.getRecordType() == JDBCJournalRecord.PREPARE_RECORD)
{
                removeTxRecord(record);
@@ -279,13 +280,14 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal
{
                transactions.remove(txH.transactionID);
             }
          }
+         connection.rollback();
       } catch (Exception sqlE) {
-         logger.warn(sqlE.getMessage(), sqlE);
+         logger.error(sqlE.getMessage(), sqlE);
+         criticalIOErrorListener.onIOException(sqlE, sqlE.getMessage(), null);
          ActiveMQJournalLogger.LOGGER.error("Error performing rollback", sqlE);
       }
    }
 
-   // TODO Use an executor.
    private void executeCallbacks(final List<JDBCJournalRecord> records, final boolean
result) {
       Runnable r = new Runnable() {
          @Override
@@ -299,7 +301,13 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal
{
    }
 
    private void appendRecord(JDBCJournalRecord record) throws Exception {
+
       record.storeLineUp();
+      if (!started) {
+         if (record.getIoCompletion() != null) {
+            record.getIoCompletion().onError(ActiveMQExceptionType.IO_ERROR.getCode(), "JDBC
Journal not started");
+         }
+      }
 
       SimpleWaitIOCallback callback = null;
       if (record.isSync() && record.getIoCompletion() == null) {
@@ -318,10 +326,7 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal
{
       }
 
       syncTimer.delay();
-
-      if (callback != null) {
-         callback.waitCompletion();
-      }
+      if (callback != null) callback.waitCompletion();
    }
 
    private synchronized void addTxRecord(JDBCJournalRecord record) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b68b0a4/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java
index b094164..a33888d 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java
@@ -26,6 +26,7 @@ import java.sql.SQLException;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
 import org.apache.activemq.artemis.core.journal.EncodingSupport;
 import org.apache.activemq.artemis.core.journal.IOCompletion;
 import org.apache.activemq.artemis.core.persistence.Persister;
@@ -116,7 +117,7 @@ class JDBCJournalRecord {
          if (success) {
             ioCompletion.done();
          } else {
-            ioCompletion.onError(1, "DATABASE TRANSACTION FAILED");
+            ioCompletion.onError(ActiveMQExceptionType.IO_ERROR.getCode(), "JDBC Transaction
failed.");
          }
       }
    }
@@ -127,7 +128,7 @@ class JDBCJournalRecord {
       }
    }
 
-   void writeRecord(PreparedStatement statement) throws SQLException {
+   void writeRecord(PreparedStatement statement) throws Exception {
 
       byte[] recordBytes = new byte[variableSize];
       byte[] txDataBytes = new byte[txDataSize];
@@ -137,6 +138,7 @@ class JDBCJournalRecord {
          txData.read(txDataBytes);
       } catch (IOException e) {
          ActiveMQJournalLogger.LOGGER.error("Error occurred whilst reading Journal Record",
e);
+         throw e;
       }
 
       statement.setLong(1, id);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b68b0a4/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java
b/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java
index b7d0c9d..b04b74f 100644
--- a/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java
+++ b/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java
@@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
 import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils;
 import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFile;
@@ -64,7 +65,11 @@ public class JDBCSequentialFileFactoryTest {
 
       String connectionUrl = "jdbc:derby:target/data;create=true";
       String tableName = "FILES";
-      factory = new JDBCSequentialFileFactory(connectionUrl, className, JDBCUtils.getSQLProvider(className,
tableName, SQLProvider.DatabaseStoreType.PAGE), executor);
+      factory = new JDBCSequentialFileFactory(connectionUrl, className, JDBCUtils.getSQLProvider(className,
tableName, SQLProvider.DatabaseStoreType.PAGE), executor, new IOCriticalErrorListener() {
+         @Override
+         public void onIOException(Throwable code, String message, SequentialFile file) {
+         }
+      });
       factory.start();
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b68b0a4/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java
index 3177b6e..112cc46f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java
@@ -91,6 +91,8 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
 
    private boolean started = false;
 
+   private final IOCriticalErrorListener criticalErrorListener;
+
    public PagingStoreFactoryDatabase(final DatabaseStorageConfiguration dbConf,
                                      final StorageManager storageManager,
                                      final long syncTimeout,
@@ -104,6 +106,7 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory
{
       this.scheduledExecutor = scheduledExecutor;
       this.syncTimeout = syncTimeout;
       this.dbConf = dbConf;
+      this.criticalErrorListener = critialErrorListener;
       start();
    }
 
@@ -119,9 +122,11 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory
{
             if (sqlProviderFactory == null) {
                sqlProviderFactory = new GenericSQLProvider.Factory();
             }
+            pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(),
sqlProviderFactory.create(dbConf.getPageStoreTableName(), SQLProvider.DatabaseStoreType.PAGE),
executorFactory.getExecutor(), criticalErrorListener);
             pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(),
sqlProviderFactory.create(pageStoreTableNamePrefix, SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor());
          } else {
             String driverClassName = dbConf.getJdbcDriverClassName();
+            pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(),
driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getPageStoreTableName(),
SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor(), criticalErrorListener);
             pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(),
driverClassName, JDBCUtils.getSQLProvider(driverClassName, pageStoreTableNamePrefix, SQLProvider.DatabaseStoreType.PAGE),
executorFactory.getExecutor());
          }
          pagingFactoryFileFactory.start();
@@ -232,7 +237,7 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory
{
          sqlProvider = JDBCUtils.getSQLProvider(dbConf.getJdbcDriverClassName(), getTableNameForGUID(directoryName),
SQLProvider.DatabaseStoreType.PAGE);
       }
 
-      return  new JDBCSequentialFileFactory(pagingFactoryFileFactory.getDbDriver().getConnection(),
sqlProvider, executorFactory.getExecutor());
+      return  new JDBCSequentialFileFactory(pagingFactoryFileFactory.getDbDriver().getConnection(),
sqlProvider, executorFactory.getExecutor(), criticalErrorListener);
    }
 
    private String getTableNameForGUID(String guid) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b68b0a4/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
index 8634638..5592c9e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
@@ -62,14 +62,14 @@ public class JDBCJournalStorageManager extends JournalStorageManager {
             if (sqlProviderFactory == null) {
                sqlProviderFactory = new GenericSQLProvider.Factory();
             }
-            bindingsJournal = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getBindingsTableName(),
SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL), dbConf.getBindingsTableName(), scheduledExecutorService,
executorFactory.getExecutor());
-            messageJournal = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getMessageTableName(),
SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), dbConf.getMessageTableName(), scheduledExecutorService,
executorFactory.getExecutor());
-            largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(),
sqlProviderFactory.create(dbConf.getLargeMessageTableName(), SQLProvider.DatabaseStoreType.LARGE_MESSAGE),
executor);
+            bindingsJournal = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getBindingsTableName(),
SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL), dbConf.getBindingsTableName(), scheduledExecutorService,
executorFactory.getExecutor(), criticalErrorListener);
+            messageJournal = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getMessageTableName(),
SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), dbConf.getMessageTableName(), scheduledExecutorService,
executorFactory.getExecutor(), criticalErrorListener);
+            largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(),
sqlProviderFactory.create(dbConf.getLargeMessageTableName(), SQLProvider.DatabaseStoreType.LARGE_MESSAGE),
executor, criticalErrorListener);
          } else {
             String driverClassName = dbConf.getJdbcDriverClassName();
-            bindingsJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), driverClassName,
JDBCUtils.getSQLProvider(driverClassName, dbConf.getBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL),
scheduledExecutorService, executorFactory.getExecutor());
-            messageJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), driverClassName,
JDBCUtils.getSQLProvider(driverClassName, dbConf.getMessageTableName(), SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL),
scheduledExecutorService, executorFactory.getExecutor());
-            largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(),
driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getLargeMessageTableName(),
SQLProvider.DatabaseStoreType.LARGE_MESSAGE), executor);
+            bindingsJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), driverClassName,
JDBCUtils.getSQLProvider(driverClassName, dbConf.getBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL),
scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener);
+            messageJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), driverClassName,
JDBCUtils.getSQLProvider(driverClassName, dbConf.getMessageTableName(), SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL),
scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener);
+            largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(),
driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getLargeMessageTableName(),
SQLProvider.DatabaseStoreType.LARGE_MESSAGE), executor, criticalErrorListener);
          }
          largeMessagesFactory.start();
       } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b68b0a4/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java
index ebb5c0e..8f1d25a 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java
@@ -26,6 +26,8 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.journal.IOCompletion;
 import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
 import org.apache.activemq.artemis.core.journal.RecordInfo;
@@ -77,7 +79,12 @@ public class JDBCJournalTest extends ActiveMQTestBase {
       executorService = Executors.newSingleThreadExecutor();
       jdbcUrl = "jdbc:derby:target/data;create=true";
       SQLProvider.Factory factory = new DerbySQLProvider.Factory();
-      journal = new JDBCJournalImpl(jdbcUrl, DRIVER_CLASS, factory.create(JOURNAL_TABLE_NAME,
SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), scheduledExecutorService, executorService);
+      journal = new JDBCJournalImpl(jdbcUrl, DRIVER_CLASS, factory.create(JOURNAL_TABLE_NAME,
SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), scheduledExecutorService, executorService,
new IOCriticalErrorListener() {
+         @Override
+         public void onIOException(Throwable code, String message, SequentialFile file) {
+
+         }
+      });
       journal.start();
    }
 


Mime
View raw message