activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [2/3] activemq-artemis git commit: ARTEMIS-560 Add Support For JDBC Paging
Date Tue, 17 Jan 2017 20:02:35 GMT
ARTEMIS-560 Add Support For JDBC Paging

(cherry picked from commit 118c272c771ac4f2df168d6ef0278c8ade7b700d)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/115ccf87
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/115ccf87
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/115ccf87

Branch: refs/heads/1.x
Commit: 115ccf874d996f44c2a73e852ed5c4a0365c20b5
Parents: 0db6345
Author: Martyn Taylor <mtaylor@redhat.com>
Authored: Thu Jan 12 11:16:48 2017 +0000
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Tue Jan 17 15:02:24 2017 -0500

----------------------------------------------------------------------
 .../config/ActiveMQDefaultConfiguration.java    |   7 +
 .../jdbc/store/drivers/AbstractJDBCDriver.java  |  79 +++---
 .../store/drivers/derby/DerbySQLProvider.java   |   2 +-
 .../artemis/jdbc/store/file/JDBCFileUtils.java  |  12 +
 .../jdbc/store/file/JDBCSequentialFile.java     |   7 +-
 .../store/file/JDBCSequentialFileFactory.java   |  12 +
 .../file/JDBCSequentialFileFactoryDriver.java   | 253 +++++++++++--------
 .../PostgresSequentialSequentialFileDriver.java | 171 +++++++------
 .../storage/DatabaseStorageConfiguration.java   |  10 +
 .../deployers/impl/FileConfigurationParser.java |   1 +
 .../paging/impl/PagingStoreFactoryDatabase.java | 213 ++++++++++++++++
 .../core/paging/impl/PagingStoreFactoryNIO.java |   3 +-
 .../impl/journal/JDBCJournalStorageManager.java |   3 +
 .../core/server/impl/ActiveMQServerImpl.java    |  11 +-
 .../resources/schema/artemis-configuration.xsd  |   7 +
 .../artemis/tests/util/ActiveMQTestBase.java    |   3 +-
 .../test/resources/database-store-config.xml    |   1 +
 .../integration/paging/GlobalPagingTest.java    |  10 +
 .../tests/integration/paging/PagingTest.java    |  27 +-
 19 files changed, 601 insertions(+), 231 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/115ccf87/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
index 7f503ba..0f26a00 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
@@ -426,6 +426,9 @@ public final class ActiveMQDefaultConfiguration {
    // Default large messages table name, used with Database storage type
    private static final String DEFAULT_LARGE_MESSAGES_TABLE_NAME = "LARGE_MESSAGES";
 
+   // Default large messages table name, used with Database storage type
+   private static final String DEFAULT_PAGE_STORE_TABLE_NAME = "PAGE_STORE";
+
    // Default period to wait between connection TTL checks
    public static final long DEFAULT_CONNECTION_TTL_CHECK_INTERVAL = 2000;
 
@@ -1165,6 +1168,10 @@ public final class ActiveMQDefaultConfiguration {
       return DEFAULT_LARGE_MESSAGES_TABLE_NAME;
    }
 
+   public static String getDefaultPageStoreTableName() {
+      return DEFAULT_PAGE_STORE_TABLE_NAME;
+   }
+
    public static long getDefaultConnectionTtlCheckInterval() {
       return DEFAULT_CONNECTION_TTL_CHECK_INTERVAL;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/115ccf87/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java
index d75ea21..1828911 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java
@@ -36,6 +36,7 @@ import org.jboss.logging.Logger;
 /**
  * Class to hold common database functionality such as drivers and connections
  */
+@SuppressWarnings("SynchronizeOnNonFinalField")
 public abstract class AbstractJDBCDriver {
 
    private static final Logger logger = Logger.getLogger(AbstractJDBCDriver.class);
@@ -66,17 +67,26 @@ public abstract class AbstractJDBCDriver {
 
    public void start() throws SQLException {
       connect();
-      createSchema();
-      prepareStatements();
+      synchronized (connection) {
+         createSchema();
+         prepareStatements();
+      }
+   }
+
+   public AbstractJDBCDriver(Connection connection, SQLProvider sqlProvider) {
+      this.connection = connection;
+      this.sqlProvider = sqlProvider;
    }
 
    public void stop() throws SQLException {
-      if (sqlProvider.closeConnectionOnShutdown()) {
-         try {
-            connection.close();
-         } catch (SQLException e) {
-            logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e));
-            throw e;
+      synchronized (connection) {
+         if (sqlProvider.closeConnectionOnShutdown()) {
+            try {
+               connection.close();
+            } catch (SQLException e) {
+               logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e));
+               throw e;
+            }
          }
       }
    }
@@ -90,30 +100,32 @@ public abstract class AbstractJDBCDriver {
    }
 
    private void connect() throws SQLException {
-      if (dataSource != null) {
-         try {
-            connection = dataSource.getConnection();
-         } catch (SQLException e) {
-            logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e));
-            throw e;
-         }
-      } else {
-         try {
-            if (jdbcDriverClass == null || jdbcDriverClass.isEmpty()) {
-               throw new IllegalStateException("jdbcDriverClass is null or empty!");
-            }
-            if (jdbcConnectionUrl == null || jdbcConnectionUrl.isEmpty()) {
-               throw new IllegalStateException("jdbcConnectionUrl is null or empty!");
+      if (connection == null) {
+         if (dataSource != null) {
+            try {
+               connection = dataSource.getConnection();
+            } catch (SQLException e) {
+               logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e));
+               throw e;
             }
-            final Driver dbDriver = getDriver(jdbcDriverClass);
-            connection = dbDriver.connect(jdbcConnectionUrl, new Properties());
-            if (connection == null) {
-               throw new IllegalStateException("the driver: " + jdbcDriverClass + " isn't able to connect to the requested url: " + jdbcConnectionUrl);
+         } else {
+            try {
+               if (jdbcDriverClass == null || jdbcDriverClass.isEmpty()) {
+                  throw new IllegalStateException("jdbcDriverClass is null or empty!");
+               }
+               if (jdbcConnectionUrl == null || jdbcConnectionUrl.isEmpty()) {
+                  throw new IllegalStateException("jdbcConnectionUrl is null or empty!");
+               }
+               final Driver dbDriver = getDriver(jdbcDriverClass);
+               connection = dbDriver.connect(jdbcConnectionUrl, new Properties());
+               if (connection == null) {
+                  throw new IllegalStateException("the driver: " + jdbcDriverClass + " isn't able to connect to the requested url: " + jdbcConnectionUrl);
+               }
+            } catch (SQLException e) {
+               logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e));
+               ActiveMQJournalLogger.LOGGER.error("Unable to connect to database using URL: " + jdbcConnectionUrl);
+               throw e;
             }
-         } catch (SQLException e) {
-            logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e));
-            ActiveMQJournalLogger.LOGGER.error("Unable to connect to database using URL: " + jdbcConnectionUrl);
-            throw e;
          }
       }
    }
@@ -206,8 +218,10 @@ public abstract class AbstractJDBCDriver {
       return connection;
    }
 
-   public void setConnection(Connection connection) {
-      this.connection = connection;
+   public final void setConnection(Connection connection) {
+      if (connection == null) {
+         this.connection = connection;
+      }
    }
 
    public void setSqlProvider(SQLProvider sqlProvider) {
@@ -225,4 +239,5 @@ public abstract class AbstractJDBCDriver {
    public void setDataSource(DataSource dataSource) {
       this.dataSource = dataSource;
    }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/115ccf87/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/derby/DerbySQLProvider.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/derby/DerbySQLProvider.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/derby/DerbySQLProvider.java
index 121c6f7..281ea88 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/derby/DerbySQLProvider.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/derby/DerbySQLProvider.java
@@ -29,7 +29,7 @@ public class DerbySQLProvider extends GenericSQLProvider {
    private final String appendToFileSQL;
 
    private DerbySQLProvider(String tableName) {
-      super(tableName);
+      super(tableName.toUpperCase());
 
       createFileTableSQL = "CREATE TABLE " + tableName +
          "(ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1)," +

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/115ccf87/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCFileUtils.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCFileUtils.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCFileUtils.java
index 02b1128..58494b0 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCFileUtils.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCFileUtils.java
@@ -18,6 +18,7 @@
 package org.apache.activemq.artemis.jdbc.store.file;
 
 import javax.sql.DataSource;
+import java.sql.Connection;
 import java.sql.SQLException;
 
 import org.apache.activemq.artemis.jdbc.store.drivers.postgres.PostgresSQLProvider;
@@ -45,4 +46,15 @@ class JDBCFileUtils {
       }
       return dbDriver;
    }
+
+   static JDBCSequentialFileFactoryDriver getDBFileDriver(Connection connection, SQLProvider provider) throws SQLException {
+      JDBCSequentialFileFactoryDriver dbDriver;
+      if (provider instanceof PostgresSQLProvider) {
+         dbDriver = new PostgresSequentialSequentialFileDriver();
+         dbDriver.setConnection(connection);
+      } else {
+         dbDriver = new JDBCSequentialFileFactoryDriver(connection, provider);
+      }
+      return dbDriver;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/115ccf87/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
index 34b6a4f..3f078c2 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
@@ -88,7 +88,12 @@ public class JDBCSequentialFile implements SequentialFile {
 
    @Override
    public boolean exists() {
-      return isCreated;
+      if (isCreated) return true;
+      try {
+         return fileFactory.listFiles(extension).contains(filename);
+      } catch (Exception e) {
+         return false;
+      }
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/115ccf87/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
index 008e000..4b92c71 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.jdbc.store.file;
 import javax.sql.DataSource;
 import java.io.File;
 import java.nio.ByteBuffer;
+import java.sql.Connection;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -60,6 +61,17 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
       dbDriver = JDBCFileUtils.getDBFileDriver(className, connectionUrl, sqlProvider);
    }
 
+   public JDBCSequentialFileFactory(final Connection connection,
+                                    final SQLProvider sqlProvider,
+                                    final Executor executor) throws Exception {
+      this.executor = executor;
+      this.dbDriver = JDBCFileUtils.getDBFileDriver(connection, sqlProvider);
+   }
+
+   public JDBCSequentialFileFactoryDriver getDbDriver() {
+      return dbDriver;
+   }
+
    @Override
    public SequentialFileFactory setDatasync(boolean enabled) {
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/115ccf87/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java
index 7b9eaf1..f9f206a 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.jdbc.store.file;
 import javax.sql.DataSource;
 import java.nio.ByteBuffer;
 import java.sql.Blob;
+import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
@@ -29,6 +30,7 @@ import java.util.List;
 import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver;
 import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
 
+@SuppressWarnings("SynchronizeOnNonFinalField")
 public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
 
    protected PreparedStatement deleteFile;
@@ -55,6 +57,10 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
       super(dataSource, provider);
    }
 
+   JDBCSequentialFileFactoryDriver(Connection connection, SQLProvider sqlProvider) {
+      super(connection, sqlProvider);
+   }
+
    @Override
    protected void createSchema() throws SQLException {
       createTable(sqlProvider.getCreateFileTableSQL());
@@ -72,22 +78,24 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
       this.selectFileNamesByExtension = connection.prepareStatement(sqlProvider.getSelectFileNamesByExtensionSQL());
    }
 
-   public synchronized List<String> listFiles(String extension) throws Exception {
-      List<String> fileNames = new ArrayList<>();
-      try {
-         connection.setAutoCommit(false);
-         selectFileNamesByExtension.setString(1, extension);
-         try (ResultSet rs = selectFileNamesByExtension.executeQuery()) {
-            while (rs.next()) {
-               fileNames.add(rs.getString(1));
+   public List<String> listFiles(String extension) throws Exception {
+      synchronized (connection) {
+         List<String> fileNames = new ArrayList<>();
+         try {
+            connection.setAutoCommit(false);
+            selectFileNamesByExtension.setString(1, extension);
+            try (ResultSet rs = selectFileNamesByExtension.executeQuery()) {
+               while (rs.next()) {
+                  fileNames.add(rs.getString(1));
+               }
             }
+            connection.commit();
+         } catch (SQLException e) {
+            connection.rollback();
+            throw e;
          }
-         connection.commit();
-      } catch (SQLException e) {
-         connection.rollback();
-         throw e;
+         return fileNames;
       }
-      return fileNames;
    }
 
    /**
@@ -113,16 +121,23 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
     * @return
     * @throws SQLException
     */
-   public synchronized int fileExists(JDBCSequentialFile file) throws SQLException {
-      connection.setAutoCommit(false);
-      selectFileByFileName.setString(1, file.getFileName());
-      try (ResultSet rs = selectFileByFileName.executeQuery()) {
-         int id = rs.next() ? rs.getInt(1) : -1;
-         connection.commit();
-         return id;
-      } catch (Exception e) {
-         connection.rollback();
-         throw e;
+   public int fileExists(JDBCSequentialFile file) throws SQLException {
+      try {
+         synchronized (connection) {
+            connection.setAutoCommit(false);
+            selectFileByFileName.setString(1, file.getFileName());
+            try (ResultSet rs = selectFileByFileName.executeQuery()) {
+               int id = rs.next() ? rs.getInt(1) : -1;
+               connection.commit();
+               return id;
+            } catch (Exception e) {
+               connection.rollback();
+               throw e;
+            }
+         }
+      } catch (NullPointerException npe) {
+         npe.printStackTrace();
+         throw npe;
       }
    }
 
@@ -132,18 +147,20 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
     * @param file
     * @throws SQLException
     */
-   public synchronized void loadFile(JDBCSequentialFile file) throws SQLException {
-      connection.setAutoCommit(false);
-      readLargeObject.setInt(1, file.getId());
+   public void loadFile(JDBCSequentialFile file) throws SQLException {
+      synchronized (connection) {
+         connection.setAutoCommit(false);
+         readLargeObject.setInt(1, file.getId());
 
-      try (ResultSet rs = readLargeObject.executeQuery()) {
-         if (rs.next()) {
-            file.setWritePosition((int) rs.getBlob(1).length());
+         try (ResultSet rs = readLargeObject.executeQuery()) {
+            if (rs.next()) {
+               file.setWritePosition((int) rs.getBlob(1).length());
+            }
+            connection.commit();
+         } catch (SQLException e) {
+            connection.rollback();
+            throw e;
          }
-         connection.commit();
-      } catch (SQLException e) {
-         connection.rollback();
-         throw e;
       }
    }
 
@@ -153,21 +170,23 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
     * @param file
     * @throws SQLException
     */
-   public synchronized void createFile(JDBCSequentialFile file) throws SQLException {
-      try {
-         connection.setAutoCommit(false);
-         createFile.setString(1, file.getFileName());
-         createFile.setString(2, file.getExtension());
-         createFile.setBytes(3, new byte[0]);
-         createFile.executeUpdate();
-         try (ResultSet keys = createFile.getGeneratedKeys()) {
-            keys.next();
-            file.setId(keys.getInt(1));
+   public void createFile(JDBCSequentialFile file) throws SQLException {
+      synchronized (connection) {
+         try {
+            connection.setAutoCommit(false);
+            createFile.setString(1, file.getFileName());
+            createFile.setString(2, file.getExtension());
+            createFile.setBytes(3, new byte[0]);
+            createFile.executeUpdate();
+            try (ResultSet keys = createFile.getGeneratedKeys()) {
+               keys.next();
+               file.setId(keys.getInt(1));
+            }
+            connection.commit();
+         } catch (SQLException e) {
+            connection.rollback();
+            throw e;
          }
-         connection.commit();
-      } catch (SQLException e) {
-         connection.rollback();
-         throw e;
       }
    }
 
@@ -178,16 +197,18 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
     * @param newFileName
     * @throws SQLException
     */
-   public synchronized void renameFile(JDBCSequentialFile file, String newFileName) throws SQLException {
-      try {
-         connection.setAutoCommit(false);
-         renameFile.setString(1, newFileName);
-         renameFile.setInt(2, file.getId());
-         renameFile.executeUpdate();
-         connection.commit();
-      } catch (SQLException e) {
-         connection.rollback();
-         throw e;
+   public void renameFile(JDBCSequentialFile file, String newFileName) throws SQLException {
+      synchronized (connection) {
+         try {
+            connection.setAutoCommit(false);
+            renameFile.setString(1, newFileName);
+            renameFile.setInt(2, file.getId());
+            renameFile.executeUpdate();
+            connection.commit();
+         } catch (SQLException e) {
+            connection.rollback();
+            throw e;
+         }
       }
    }
 
@@ -197,15 +218,17 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
     * @param file
     * @throws SQLException
     */
-   public synchronized void deleteFile(JDBCSequentialFile file) throws SQLException {
-      try {
-         connection.setAutoCommit(false);
-         deleteFile.setInt(1, file.getId());
-         deleteFile.executeUpdate();
-         connection.commit();
-      } catch (SQLException e) {
-         connection.rollback();
-         throw e;
+   public void deleteFile(JDBCSequentialFile file) throws SQLException {
+      synchronized (connection) {
+         try {
+            connection.setAutoCommit(false);
+            deleteFile.setInt(1, file.getId());
+            deleteFile.executeUpdate();
+            connection.commit();
+         } catch (SQLException e) {
+            connection.rollback();
+            throw e;
+         }
       }
    }
 
@@ -217,17 +240,19 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
     * @return
     * @throws SQLException
     */
-   public synchronized int writeToFile(JDBCSequentialFile file, byte[] data) throws SQLException {
-      try {
-         connection.setAutoCommit(false);
-         appendToLargeObject.setBytes(1, data);
-         appendToLargeObject.setInt(2, file.getId());
-         appendToLargeObject.executeUpdate();
-         connection.commit();
-         return data.length;
-      } catch (SQLException e) {
-         connection.rollback();
-         throw e;
+   public int writeToFile(JDBCSequentialFile file, byte[] data) throws SQLException {
+      synchronized (connection) {
+         try {
+            connection.setAutoCommit(false);
+            appendToLargeObject.setBytes(1, data);
+            appendToLargeObject.setInt(2, file.getId());
+            appendToLargeObject.executeUpdate();
+            connection.commit();
+            return data.length;
+         } catch (SQLException e) {
+            connection.rollback();
+            throw e;
+         }
       }
    }
 
@@ -239,22 +264,24 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
     * @return
     * @throws SQLException
     */
-   public synchronized int readFromFile(JDBCSequentialFile file, ByteBuffer bytes) throws SQLException {
-      connection.setAutoCommit(false);
-      readLargeObject.setInt(1, file.getId());
-      int readLength = 0;
-      try (ResultSet rs = readLargeObject.executeQuery()) {
-         if (rs.next()) {
-            Blob blob = rs.getBlob(1);
-            readLength = (int) calculateReadLength(blob.length(), bytes.remaining(), file.position());
-            byte[] data = blob.getBytes(file.position() + 1, readLength);
-            bytes.put(data);
+   public int readFromFile(JDBCSequentialFile file, ByteBuffer bytes) throws SQLException {
+      synchronized (connection) {
+         connection.setAutoCommit(false);
+         readLargeObject.setInt(1, file.getId());
+         int readLength = 0;
+         try (ResultSet rs = readLargeObject.executeQuery()) {
+            if (rs.next()) {
+               Blob blob = rs.getBlob(1);
+               readLength = (int) calculateReadLength(blob.length(), bytes.remaining(), file.position());
+               byte[] data = blob.getBytes(file.position() + 1, readLength);
+               bytes.put(data);
+            }
+            connection.commit();
+            return readLength;
+         } catch (Throwable e) {
+            connection.rollback();
+            throw e;
          }
-         connection.commit();
-         return readLength;
-      } catch (Throwable e) {
-         connection.rollback();
-         throw e;
       }
    }
 
@@ -265,16 +292,18 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
     * @param fileTo
     * @throws SQLException
     */
-   public synchronized void copyFileData(JDBCSequentialFile fileFrom, JDBCSequentialFile fileTo) throws SQLException {
-      try {
-         connection.setAutoCommit(false);
-         copyFileRecord.setInt(1, fileFrom.getId());
-         copyFileRecord.setInt(2, fileTo.getId());
-         copyFileRecord.executeUpdate();
-         connection.commit();
-      } catch (SQLException e) {
-         connection.rollback();
-         throw e;
+   public void copyFileData(JDBCSequentialFile fileFrom, JDBCSequentialFile fileTo) throws SQLException {
+      synchronized (connection) {
+         try {
+            connection.setAutoCommit(false);
+            copyFileRecord.setInt(1, fileFrom.getId());
+            copyFileRecord.setInt(2, fileTo.getId());
+            copyFileRecord.executeUpdate();
+            connection.commit();
+         } catch (SQLException e) {
+            connection.rollback();
+            throw e;
+         }
       }
    }
 
@@ -282,16 +311,18 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
     * Drop all tables and data
     */
    @Override
-   public synchronized void destroy() throws SQLException {
-      try {
-         connection.setAutoCommit(false);
-         try (Statement statement = connection.createStatement()) {
-            statement.executeUpdate(sqlProvider.getDropFileTableSQL());
+   public void destroy() throws SQLException {
+      synchronized (connection) {
+         try {
+            connection.setAutoCommit(false);
+            try (Statement statement = connection.createStatement()) {
+               statement.executeUpdate(sqlProvider.getDropFileTableSQL());
+            }
+            connection.commit();
+         } catch (SQLException e) {
+            connection.rollback();
+            throw e;
          }
-         connection.commit();
-      } catch (SQLException e) {
-         connection.rollback();
-         throw e;
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/115ccf87/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/PostgresSequentialSequentialFileDriver.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/PostgresSequentialSequentialFileDriver.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/PostgresSequentialSequentialFileDriver.java
index c7411a6..8c0f975 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/PostgresSequentialSequentialFileDriver.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/PostgresSequentialSequentialFileDriver.java
@@ -24,6 +24,7 @@ import org.postgresql.PGConnection;
 import org.postgresql.largeobject.LargeObject;
 import org.postgresql.largeobject.LargeObjectManager;
 
+@SuppressWarnings("SynchronizeOnNonFinalField")
 public class PostgresSequentialSequentialFileDriver extends JDBCSequentialFileFactoryDriver {
 
    private static final String POSTGRES_OID_KEY = "POSTGRES_OID_KEY";
@@ -33,105 +34,115 @@ public class PostgresSequentialSequentialFileDriver extends JDBCSequentialFileFa
    }
 
    @Override
-   public synchronized void createFile(JDBCSequentialFile file) throws SQLException {
-      try {
-         connection.setAutoCommit(false);
+   public void createFile(JDBCSequentialFile file) throws SQLException {
+      synchronized (connection) {
+         try {
+            connection.setAutoCommit(false);
 
-         LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI();
-         long oid = lobjManager.createLO();
+            LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI();
+            long oid = lobjManager.createLO();
 
-         createFile.setString(1, file.getFileName());
-         createFile.setString(2, file.getExtension());
-         createFile.setLong(3, oid);
-         createFile.executeUpdate();
+            createFile.setString(1, file.getFileName());
+            createFile.setString(2, file.getExtension());
+            createFile.setLong(3, oid);
+            createFile.executeUpdate();
 
-         try (ResultSet keys = createFile.getGeneratedKeys()) {
-            keys.next();
-            file.setId(keys.getInt(1));
+            try (ResultSet keys = createFile.getGeneratedKeys()) {
+               keys.next();
+               file.setId(keys.getInt(1));
+            }
+            connection.commit();
+         } catch (SQLException e) {
+            connection.rollback();
+            throw e;
          }
-         connection.commit();
-      } catch (SQLException e) {
-         connection.rollback();
-         throw e;
       }
    }
 
    @Override
-   public synchronized void loadFile(JDBCSequentialFile file) throws SQLException {
-      connection.setAutoCommit(false);
-      readLargeObject.setInt(1, file.getId());
+   public void loadFile(JDBCSequentialFile file) throws SQLException {
+      synchronized (connection) {
+         connection.setAutoCommit(false);
+         readLargeObject.setInt(1, file.getId());
 
-      try (ResultSet rs = readLargeObject.executeQuery()) {
-         if (rs.next()) {
-            file.setWritePosition(getPostGresLargeObjectSize(file));
+         try (ResultSet rs = readLargeObject.executeQuery()) {
+            if (rs.next()) {
+               file.setWritePosition(getPostGresLargeObjectSize(file));
+            }
+            connection.commit();
+         } catch (SQLException e) {
+            connection.rollback();
+            throw e;
          }
-         connection.commit();
-      } catch (SQLException e) {
-         connection.rollback();
-         throw e;
       }
    }
 
    @Override
-   public synchronized int writeToFile(JDBCSequentialFile file, byte[] data) throws SQLException {
-      LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI();
-      LargeObject largeObject = null;
+   public int writeToFile(JDBCSequentialFile file, byte[] data) throws SQLException {
+      synchronized (connection) {
+         LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI();
+         LargeObject largeObject = null;
 
-      Long oid = getOID(file);
-      try {
-         connection.setAutoCommit(false);
-         largeObject = lobjManager.open(oid, LargeObjectManager.WRITE);
-         largeObject.seek(largeObject.size());
-         largeObject.write(data);
-         largeObject.close();
-         connection.commit();
-      } catch (Exception e) {
-         connection.rollback();
-         throw e;
+         Long oid = getOID(file);
+         try {
+            connection.setAutoCommit(false);
+            largeObject = lobjManager.open(oid, LargeObjectManager.WRITE);
+            largeObject.seek(largeObject.size());
+            largeObject.write(data);
+            largeObject.close();
+            connection.commit();
+         } catch (Exception e) {
+            connection.rollback();
+            throw e;
+         }
+         return data.length;
       }
-      return data.length;
    }
 
    @Override
-   public synchronized int readFromFile(JDBCSequentialFile file, ByteBuffer bytes) throws SQLException {
+   public int readFromFile(JDBCSequentialFile file, ByteBuffer bytes) throws SQLException {
       LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI();
       LargeObject largeObject = null;
       long oid = getOID(file);
-      try {
-         connection.setAutoCommit(false);
-         largeObject = lobjManager.open(oid, LargeObjectManager.READ);
-         int readLength = (int) calculateReadLength(largeObject.size(), bytes.remaining(), file.position());
-
-         if (readLength > 0) {
-            if (file.position() > 0)
-               largeObject.seek((int) file.position());
-            byte[] data = largeObject.read(readLength);
-            bytes.put(data);
-         }
+      synchronized (connection) {
+         try {
+            connection.setAutoCommit(false);
+            largeObject = lobjManager.open(oid, LargeObjectManager.READ);
+            int readLength = (int) calculateReadLength(largeObject.size(), bytes.remaining(), file.position());
+
+            if (readLength > 0) {
+               if (file.position() > 0)
+                  largeObject.seek((int) file.position());
+               byte[] data = largeObject.read(readLength);
+               bytes.put(data);
+            }
 
-         largeObject.close();
-         connection.commit();
+            largeObject.close();
+            connection.commit();
 
-         return readLength;
-      } catch (SQLException e) {
-         connection.rollback();
-         throw e;
+            return readLength;
+         } catch (SQLException e) {
+            connection.rollback();
+            throw e;
+         }
       }
    }
 
-   private synchronized Long getOID(JDBCSequentialFile file) throws SQLException {
+   private Long getOID(JDBCSequentialFile file) throws SQLException {
       Long oid = (Long) file.getMetaData(POSTGRES_OID_KEY);
       if (oid == null) {
-         connection.setAutoCommit(false);
-         readLargeObject.setInt(1, file.getId());
-         try (ResultSet rs = readLargeObject.executeQuery()) {
-            if (rs.next()) {
-               file.addMetaData(POSTGRES_OID_KEY, rs.getLong(1));
+         synchronized (connection) {
+            connection.setAutoCommit(false);
+            readLargeObject.setInt(1, file.getId());
+            try (ResultSet rs = readLargeObject.executeQuery()) {
+               if (rs.next()) {
+                  file.addMetaData(POSTGRES_OID_KEY, rs.getLong(1));
+               }
+               connection.commit();
+            } catch (SQLException e) {
+               connection.rollback();
+               throw e;
             }
-            connection.commit();
-         } catch (SQLException e) {
-            connection.rollback();
-            throw e;
          }
       }
       if ((Long) file.getMetaData(POSTGRES_OID_KEY) == 0) {
@@ -140,21 +151,23 @@ public class PostgresSequentialSequentialFileDriver extends JDBCSequentialFileFa
       return (Long) file.getMetaData(POSTGRES_OID_KEY);
    }
 
-   private synchronized int getPostGresLargeObjectSize(JDBCSequentialFile file) throws SQLException {
+   private int getPostGresLargeObjectSize(JDBCSequentialFile file) throws SQLException {
       LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI();
 
       int size = 0;
       Long oid = getOID(file);
       if (oid != null) {
-         try {
-            connection.setAutoCommit(false);
-            LargeObject largeObject = lobjManager.open(oid, LargeObjectManager.READ);
-            size = largeObject.size();
-            largeObject.close();
-            connection.commit();
-         } catch (SQLException e) {
-            connection.rollback();
-            throw e;
+         synchronized (connection) {
+            try {
+               connection.setAutoCommit(false);
+               LargeObject largeObject = lobjManager.open(oid, LargeObjectManager.READ);
+               size = largeObject.size();
+               largeObject.close();
+               connection.commit();
+            } catch (SQLException e) {
+               connection.rollback();
+               throw e;
+            }
          }
       }
       return size;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/115ccf87/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java
index 8b20770..eb8b435 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java
@@ -30,6 +30,8 @@ public class DatabaseStorageConfiguration implements StoreConfiguration {
 
    private String largeMessagesTableName = ActiveMQDefaultConfiguration.getDefaultLargeMessagesTableName();
 
+   private String pageStoreTableName = ActiveMQDefaultConfiguration.getDefaultPageStoreTableName();
+
    private String jdbcConnectionUrl = ActiveMQDefaultConfiguration.getDefaultDatabaseUrl();
 
    private String jdbcDriverClassName = ActiveMQDefaultConfiguration.getDefaultDriverClassName();
@@ -67,6 +69,14 @@ public class DatabaseStorageConfiguration implements StoreConfiguration {
       this.largeMessagesTableName = largeMessagesTableName;
    }
 
+   public String getPageStoreTableName() {
+      return pageStoreTableName;
+   }
+
+   public void setPageStoreTableName(String pageStoreTableName) {
+      this.pageStoreTableName = pageStoreTableName;
+   }
+
    public void setJdbcConnectionUrl(String jdbcConnectionUrl) {
       this.jdbcConnectionUrl = jdbcConnectionUrl;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/115ccf87/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index bd8aaf6..afd99a7 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -1158,6 +1158,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
       conf.setBindingsTableName(getString(storeNode, "bindings-table-name", conf.getBindingsTableName(), Validators.NO_CHECK));
       conf.setMessageTableName(getString(storeNode, "message-table-name", conf.getMessageTableName(), Validators.NO_CHECK));
       conf.setLargeMessageTableName(getString(storeNode, "large-message-table-name", conf.getJdbcConnectionUrl(), Validators.NO_CHECK));
+      conf.setPageStoreTableName(getString(storeNode, "page-store-table-name", conf.getPageStoreTableName(), Validators.NO_CHECK));
       conf.setJdbcConnectionUrl(getString(storeNode, "jdbc-connection-url", conf.getJdbcConnectionUrl(), Validators.NO_CHECK));
       conf.setJdbcDriverClassName(getString(storeNode, "jdbc-driver-class-name", conf.getJdbcDriverClassName(), Validators.NO_CHECK));
       return conf;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/115ccf87/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java
new file mode 100644
index 0000000..ee9d7bb
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.paging.impl;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.io.SequentialFile;
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.core.paging.PagingManager;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.PagingStoreFactory;
+import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
+import org.apache.activemq.artemis.core.paging.cursor.impl.PageCursorProviderImpl;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
+import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils;
+import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFile;
+import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactory;
+import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactoryDriver;
+import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider;
+import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
+import org.apache.activemq.artemis.utils.ExecutorFactory;
+import org.apache.activemq.artemis.utils.UUIDGenerator;
+
+/**
+ * Integration point between Paging and JDBC
+ */
+public class PagingStoreFactoryDatabase implements PagingStoreFactory {
+
+   // Constants -----------------------------------------------------
+
+   private static final String ADDRESS_FILE = "address.txt";
+
+   private static final String DIRECTORY_NAME = "directory.txt";
+
+   // Attributes ----------------------------------------------------
+
+   protected final boolean syncNonTransactional;
+
+   private PagingManager pagingManager;
+
+   private final ScheduledExecutorService scheduledExecutor;
+
+   private final long syncTimeout;
+
+   protected final StorageManager storageManager;
+
+   private JDBCSequentialFileFactoryDriver dbDriver;
+
+   private DatabaseStorageConfiguration dbConf;
+
+   private ExecutorFactory executorFactory;
+
+   private JDBCSequentialFileFactory pagingFactoryFileFactory;
+
+   private JDBCSequentialFile directoryList;
+
+   public PagingStoreFactoryDatabase(final DatabaseStorageConfiguration dbConf,
+                                     final StorageManager storageManager,
+                                     final long syncTimeout,
+                                     final ScheduledExecutorService scheduledExecutor,
+                                     final ExecutorFactory executorFactory,
+                                     final boolean syncNonTransactional,
+                                     final IOCriticalErrorListener critialErrorListener) throws Exception {
+      this.storageManager = storageManager;
+      this.executorFactory = executorFactory;
+      this.syncNonTransactional = syncNonTransactional;
+      this.scheduledExecutor = scheduledExecutor;
+      this.syncTimeout = syncTimeout;
+      this.dbConf = dbConf;
+
+      if (dbConf.getDataSource() != null) {
+         SQLProvider.Factory sqlProviderFactory = dbConf.getSqlProviderFactory();
+         if (sqlProviderFactory == null) {
+            sqlProviderFactory = new GenericSQLProvider.Factory();
+         }
+         pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getPageStoreTableName()), executorFactory.getExecutor());
+      } else {
+         String driverClassName = dbConf.getJdbcDriverClassName();
+         pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getPageStoreTableName()), executorFactory.getExecutor());
+      }
+      pagingFactoryFileFactory.start();
+      directoryList = (JDBCSequentialFile) pagingFactoryFileFactory.createSequentialFile(DIRECTORY_NAME);
+      directoryList.open();
+   }
+
+   // Public --------------------------------------------------------
+
+   @Override
+   public void stop() {
+      pagingFactoryFileFactory.stop();
+   }
+
+   @Override
+   public void injectMonitor(FileStoreMonitor monitor) throws Exception {
+   }
+
+   @Override
+   public PageCursorProvider newCursorProvider(PagingStore store,
+                                               StorageManager storageManager,
+                                               AddressSettings addressSettings,
+                                               Executor executor) {
+      return new PageCursorProviderImpl(store, storageManager, executor, addressSettings.getPageCacheMaxSize());
+   }
+
+   @Override
+   public synchronized PagingStore newStore(final SimpleString address, final AddressSettings settings) {
+
+      return new PagingStoreImpl(address, scheduledExecutor, syncTimeout, pagingManager, storageManager, null, this, address, settings, executorFactory.getExecutor(), syncNonTransactional);
+   }
+
+   @Override
+   public synchronized SequentialFileFactory newFileFactory(final SimpleString address) throws Exception {
+      String guid = UUIDGenerator.getInstance().generateStringUUID();
+      SequentialFileFactory factory = newFileFactory(guid, true);
+      factory.start();
+
+      SequentialFile file = factory.createSequentialFile(PagingStoreFactoryDatabase.ADDRESS_FILE);
+      file.open();
+
+      ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer(SimpleString.sizeofNullableString(address));
+      buffer.writeSimpleString(address);
+      file.write(buffer, true);
+      return factory;
+   }
+
+   @Override
+   public void setPagingManager(final PagingManager pagingManager) {
+      this.pagingManager = pagingManager;
+   }
+
+   @Override
+   public synchronized List<PagingStore> reloadStores(final HierarchicalRepository<AddressSettings> addressSettingsRepository) throws Exception {
+      // We assume the directory list < Integer.MAX_VALUE (this is only a list of addresses).
+      int size = ((Long) directoryList.size()).intValue();
+      ActiveMQBuffer buffer = readActiveMQBuffer(directoryList, size);
+
+      ArrayList<PagingStore> storesReturn = new ArrayList<>();
+
+      while (buffer.readableBytes() > 0) {
+         SimpleString guid = buffer.readSimpleString();
+
+         JDBCSequentialFileFactory factory = (JDBCSequentialFileFactory) newFileFactory(guid.toString(), false);
+         factory.start();
+
+         JDBCSequentialFile addressFile = (JDBCSequentialFile) factory.createSequentialFile(ADDRESS_FILE);
+         addressFile.open();
+
+         size = ((Long) addressFile.size()).intValue();
+         if (size == 0) {
+            continue;
+         }
+
+         ActiveMQBuffer addrBuffer = readActiveMQBuffer(addressFile, size);
+         SimpleString address = addrBuffer.readSimpleString();
+
+         AddressSettings settings = addressSettingsRepository.getMatch(address.toString());
+
+         PagingStore store = new PagingStoreImpl(address, scheduledExecutor, syncTimeout, pagingManager, storageManager, factory, this, address, settings, executorFactory.getExecutor(), syncNonTransactional);
+
+         storesReturn.add(store);
+      }
+      return storesReturn;
+   }
+
+   private synchronized SequentialFileFactory newFileFactory(final String directoryName, boolean writeToDirectory) throws Exception {
+      SimpleString simpleString = SimpleString.toSimpleString(directoryName);
+      ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer(simpleString.sizeof());
+      buffer.writeSimpleString(simpleString);
+      if (writeToDirectory) directoryList.write(buffer, true);
+      return new JDBCSequentialFileFactory(pagingFactoryFileFactory.getDbDriver().getConnection(), JDBCUtils.getSQLProvider(dbConf.getJdbcDriverClassName(), getTableNameForGUID(directoryName)), executorFactory.getExecutor());
+   }
+
+   private String getTableNameForGUID(String guid) {
+      return dbConf.getPageStoreTableName() + guid.replace("-", "");
+   }
+
+   private ActiveMQBuffer readActiveMQBuffer(SequentialFile file, int size) throws Exception {
+      ByteBuffer byteBuffer = ByteBuffer.allocate(size);
+      byteBuffer.mark();
+      file.read(byteBuffer);
+      byteBuffer.reset();
+
+      ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer(byteBuffer);
+      buffer.writerIndex(size);
+      return buffer;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/115ccf87/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java
index e0f3a22..823baf8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java
@@ -70,7 +70,7 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory {
 
    private final long syncTimeout;
 
-   private final StorageManager storageManager;
+   protected final StorageManager storageManager;
 
    private final IOCriticalErrorListener critialErrorListener;
 
@@ -187,6 +187,7 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory {
    }
 
    private SequentialFileFactory newFileFactory(final String directoryName) {
+
       return new NIOSequentialFileFactory(new File(directory, directoryName), false, critialErrorListener, 1);
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/115ccf87/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
index 4e5c447..416da0b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.core.persistence.impl.journal;
 
 import java.nio.ByteBuffer;
+import java.sql.Connection;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -34,6 +35,8 @@ import org.apache.activemq.artemis.utils.ExecutorFactory;
 
 public class JDBCJournalStorageManager extends JournalStorageManager {
 
+   private Connection connection;
+
    public JDBCJournalStorageManager(Configuration config,
                                     ExecutorFactory executorFactory,
                                     ExecutorFactory ioExecutorFactory,

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/115ccf87/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 2f5f3fa..1e7d203 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -59,6 +59,7 @@ import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
 import org.apache.activemq.artemis.core.config.DivertConfiguration;
 import org.apache.activemq.artemis.core.config.StoreConfiguration;
 import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
+import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
 import org.apache.activemq.artemis.core.deployers.impl.FileConfigurationParser;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
@@ -68,8 +69,10 @@ import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
 import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
 import org.apache.activemq.artemis.core.management.impl.ActiveMQServerControlImpl;
 import org.apache.activemq.artemis.core.paging.PagingManager;
+import org.apache.activemq.artemis.core.paging.PagingStoreFactory;
 import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
 import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl;
+import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryDatabase;
 import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryNIO;
 import org.apache.activemq.artemis.core.persistence.GroupingInfo;
 import org.apache.activemq.artemis.core.persistence.OperationContext;
@@ -1836,11 +1839,15 @@ public class ActiveMQServerImpl implements ActiveMQServer {
       this.queueFactory = factory;
    }
 
-   protected PagingManager createPagingManager() {
+   protected PagingManager createPagingManager() throws Exception {
       return new PagingManagerImpl(getPagingStoreFactory(), addressSettingsRepository, configuration.getGlobalMaxSize());
    }
 
-   protected PagingStoreFactoryNIO getPagingStoreFactory() {
+   protected PagingStoreFactory getPagingStoreFactory() throws Exception {
+      if (configuration.getStoreConfiguration() != null && configuration.getStoreConfiguration().getStoreType() == StoreConfiguration.StoreType.DATABASE) {
+         DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration) configuration.getStoreConfiguration();
+         return new PagingStoreFactoryDatabase(dbConf, storageManager, configuration.getJournalBufferTimeout_NIO(), scheduledPool, executorFactory, false, shutdownOnCriticalIO);
+      }
       return new PagingStoreFactoryNIO(storageManager, configuration.getPagingLocation(), configuration.getJournalBufferTimeout_NIO(), scheduledPool, executorFactory, configuration.isJournalSyncNonTransactional(), shutdownOnCriticalIO);
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/115ccf87/artemis-server/src/main/resources/schema/artemis-configuration.xsd
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 46d92e3..c34ae24 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -1696,6 +1696,13 @@
                </xsd:documentation>
             </xsd:annotation>
          </xsd:element>
+         <xsd:element name="page-store-table-name" type="xsd:string" minOccurs="1" maxOccurs="1">
+            <xsd:annotation>
+               <xsd:documentation>
+                  The table name used to large message files
+               </xsd:documentation>
+            </xsd:annotation>
+         </xsd:element>
       </xsd:all>
    </xsd:complexType>
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/115ccf87/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
index 7f01767..6fbf808 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
@@ -455,12 +455,13 @@ public abstract class ActiveMQTestBase extends Assert {
       return configuration;
    }
 
-   private void setDBStoreType(Configuration configuration) {
+   protected void setDBStoreType(Configuration configuration) {
       DatabaseStorageConfiguration dbStorageConfiguration = new DatabaseStorageConfiguration();
       dbStorageConfiguration.setJdbcConnectionUrl(getTestJDBCConnectionUrl());
       dbStorageConfiguration.setBindingsTableName("BINDINGS");
       dbStorageConfiguration.setMessageTableName("MESSAGE");
       dbStorageConfiguration.setLargeMessageTableName("LARGE_MESSAGE");
+      dbStorageConfiguration.setPageStoreTableName("PAGE_STORE");
       dbStorageConfiguration.setJdbcDriverClassName(getJDBCClassName());
 
       configuration.setStoreConfiguration(dbStorageConfiguration);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/115ccf87/artemis-server/src/test/resources/database-store-config.xml
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/resources/database-store-config.xml b/artemis-server/src/test/resources/database-store-config.xml
index 1fa3bd6..69f9da7 100644
--- a/artemis-server/src/test/resources/database-store-config.xml
+++ b/artemis-server/src/test/resources/database-store-config.xml
@@ -25,6 +25,7 @@
             <bindings-table-name>BINDINGS_TABLE</bindings-table-name>
             <message-table-name>MESSAGE_TABLE</message-table-name>
             <large-message-table-name>LARGE_MESSAGE_TABLE</large-message-table-name>
+            <page-store-table-name>PAGE_STORE_TABLE</page-store-table-name>
             <jdbc-driver-class-name>org.apache.derby.jdbc.EmbeddedDriver</jdbc-driver-class-name>
          </database-store>
       </store>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/115ccf87/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java
index c94f54a..3960b49 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java
@@ -27,6 +27,7 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage;
 import org.apache.activemq.artemis.api.core.client.ClientProducer;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.StoreConfiguration;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServers;
 import org.apache.activemq.artemis.core.server.Queue;
@@ -36,9 +37,16 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+@RunWith(Parameterized.class)
 public class GlobalPagingTest extends PagingTest {
 
+   public GlobalPagingTest(StoreConfiguration.StoreType storeType) {
+      super(storeType);
+   }
+
    @Override
    @Before
    public void setUp() throws Exception {
@@ -69,6 +77,8 @@ public class GlobalPagingTest extends PagingTest {
 
    @Test
    public void testPagingOverFullDisk() throws Exception {
+      if (storeType == StoreConfiguration.StoreType.DATABASE) return;
+
       clearDataRecreateServerDirs();
 
       Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/115ccf87/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
index 068a299..76f3dfd 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
@@ -24,6 +24,8 @@ import java.io.OutputStream;
 import java.lang.management.ManagementFactory;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -51,6 +53,7 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator;
 import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal;
 import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.config.DivertConfiguration;
+import org.apache.activemq.artemis.core.config.StoreConfiguration;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
@@ -84,7 +87,10 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+@RunWith(Parameterized.class)
 public class PagingTest extends ActiveMQTestBase {
 
    private static final Logger logger = Logger.getLogger(PagingTest.class);
@@ -102,8 +108,19 @@ public class PagingTest extends ActiveMQTestBase {
 
    protected static final int PAGE_SIZE = 10 * 1024;
 
+   protected final StoreConfiguration.StoreType storeType;
+
    static final SimpleString ADDRESS = new SimpleString("SimpleAddress");
 
+   public PagingTest(StoreConfiguration.StoreType storeType) {
+      this.storeType = storeType;
+   }
+
+   @Parameterized.Parameters(name = "storeType={0}")
+   public static Collection<Object[]> data() {
+      Object[][] params = new Object[][]{{StoreConfiguration.StoreType.FILE}, {StoreConfiguration.StoreType.DATABASE}};
+      return Arrays.asList(params);
+   }
 
    @Before
    public void checkLoggerStart() throws Exception {
@@ -121,8 +138,6 @@ public class PagingTest extends ActiveMQTestBase {
       }
    }
 
-
-
    @Override
    @Before
    public void setUp() throws Exception {
@@ -1444,6 +1459,8 @@ public class PagingTest extends ActiveMQTestBase {
 
    @Test
    public void testMissingTXEverythingAcked() throws Exception {
+      if (storeType == StoreConfiguration.StoreType.DATABASE) return;
+
       clearDataRecreateServerDirs();
 
       Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
@@ -5631,7 +5648,11 @@ public class PagingTest extends ActiveMQTestBase {
 
    @Override
    protected Configuration createDefaultInVMConfig() throws Exception {
-      return super.createDefaultInVMConfig().setJournalSyncNonTransactional(false);
+      Configuration configuration = super.createDefaultInVMConfig().setJournalSyncNonTransactional(false);
+      if (storeType == StoreConfiguration.StoreType.DATABASE) {
+         setDBStoreType(configuration);
+      }
+      return configuration;
    }
 
    private static final class DummyOperationContext implements OperationContext {


Mime
View raw message