activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [1/3] activemq-artemis git commit: ARTEMIS-513 Add JDBC Sequential File Factory Impl
Date Wed, 04 May 2016 17:38:38 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 2a415a80e -> e458a4327


ARTEMIS-513 Add JDBC Sequential File Factory Impl


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c9b95343
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c9b95343
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c9b95343

Branch: refs/heads/master
Commit: c9b953433e011ded66ba87bdb2b2edcf0b87f67c
Parents: 2a415a8
Author: Martyn Taylor <mtaylor@redhat.com>
Authored: Tue May 3 14:36:10 2016 +0100
Committer: Martyn Taylor <mtaylor@redhat.com>
Committed: Wed May 4 12:24:25 2016 +0100

----------------------------------------------------------------------
 .../activemq/artemis/jdbc/store/JDBCUtils.java  |  13 +
 .../jdbc/store/file/JDBCSequentialFile.java     | 419 +++++++++++++++++++
 .../store/file/JDBCSequentialFileFactory.java   | 229 ++++++++++
 .../jdbc/store/file/sql/DerbySQLProvider.java   |  52 +++
 .../jdbc/store/file/sql/GenericSQLProvider.java | 143 +++++++
 .../jdbc/store/file/sql/SQLProvider.java        |  46 ++
 .../jdbc/store/journal/JDBCJournalImpl.java     |   7 +-
 .../file/JDBCSequentialFileFactoryTest.java     | 185 ++++++++
 .../core/io/nio/NIOSequentialFileFactory.java   |  28 +-
 9 files changed, 1107 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c9b95343/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/JDBCUtils.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/JDBCUtils.java
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/JDBCUtils.java
index b44f225..bc04ab9 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/JDBCUtils.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/JDBCUtils.java
@@ -23,6 +23,10 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 
+import org.apache.activemq.artemis.jdbc.store.file.sql.DerbySQLProvider;
+import org.apache.activemq.artemis.jdbc.store.file.sql.GenericSQLProvider;
+import org.apache.activemq.artemis.jdbc.store.file.sql.SQLProvider;
+
 public class JDBCUtils {
 
    public static Driver getDriver(String className) throws Exception {
@@ -60,4 +64,13 @@ public class JDBCUtils {
          statement.executeUpdate(sql);
       }
    }
+
+   public static SQLProvider getSQLProvider(String driverClass, String tableName) {
+      if (driverClass.contains("derby")) {
+         return new DerbySQLProvider(tableName);
+      }
+      else {
+         return new GenericSQLProvider(tableName);
+      }
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c9b95343/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
new file mode 100644
index 0000000..73bec72
--- /dev/null
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.jdbc.store.file;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.sql.Blob;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.concurrent.Executor;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.io.SequentialFile;
+import org.apache.activemq.artemis.core.io.buffer.TimedBuffer;
+import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
+import org.apache.activemq.artemis.jdbc.store.file.sql.SQLProvider;
+import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
+import org.jboss.logging.Logger;
+
+public class JDBCSequentialFile implements SequentialFile {
+
+   private static final Logger logger = Logger.getLogger(JDBCSequentialFile.class);
+
+   private final String filename;
+
+   private final String extension;
+
+   private boolean isOpen = false;
+
+   private boolean isCreated = false;
+
+   private int id = -1;
+
+   private final PreparedStatement appendToFile;
+
+   private final PreparedStatement deleteFile;
+
+   private final PreparedStatement readFile;
+
+   private final PreparedStatement createFile;
+
+   private final PreparedStatement selectFileByFileName;
+
+   private final PreparedStatement copyFileRecord;
+
+   private final PreparedStatement renameFile;
+
+   private long readPosition = 0;
+
+   private long writePosition = 0;
+
+   private Executor executor;
+
+   private JDBCSequentialFileFactory fileFactory;
+
+   private int maxSize;
+
+   private SQLProvider sqlProvider;
+
+   private final Object writeLock;
+
+   public JDBCSequentialFile(final JDBCSequentialFileFactory fileFactory,
+                             final String filename,
+                             final SQLProvider sqlProvider,
+                             final Executor executor,
+                             final Object writeLock) throws SQLException {
+      this.fileFactory = fileFactory;
+      this.filename = filename;
+      this.extension = filename.contains(".") ? filename.substring(filename.lastIndexOf(".")
+ 1, filename.length()) : "";
+      this.executor = executor;
+      this.maxSize = sqlProvider.getMaxBlobSize();
+      this.sqlProvider = sqlProvider;
+      this.writeLock = writeLock;
+
+      Connection connection = fileFactory.getConnection();
+      this.appendToFile = connection.prepareStatement(sqlProvider.getAppendToFileSQL());
+      this.deleteFile = connection.prepareStatement(sqlProvider.getDeleteFileSQL());
+      this.createFile = connection.prepareStatement(sqlProvider.getInsertFileSQL(), Statement.RETURN_GENERATED_KEYS);
+      this.readFile = connection.prepareStatement(sqlProvider.getReadFileSQL());
+      this.selectFileByFileName = connection.prepareStatement(sqlProvider.getSelectFileByFileName());
+      this.copyFileRecord = connection.prepareStatement(sqlProvider.getCopyFileRecordByIdSQL());
+      this.renameFile = connection.prepareStatement(sqlProvider.getUpdateFileNameByIdSQL());
+   }
+
+   @Override
+   public boolean isOpen() {
+      return isOpen;
+   }
+
+   @Override
+   public boolean exists() {
+      return isCreated;
+   }
+
+   @Override
+   public synchronized void open() throws Exception {
+      if (!isOpen) {
+         try {
+            synchronized (writeLock) {
+               selectFileByFileName.setString(1, filename);
+
+               try (ResultSet rs = selectFileByFileName.executeQuery()) {
+                  if (!rs.next()) {
+                     createFile.setString(1, filename);
+                     createFile.setString(2, extension);
+                     createFile.setBytes(3, new byte[0]);
+                     createFile.executeUpdate();
+                     try (ResultSet keys = createFile.getGeneratedKeys()) {
+                        keys.next();
+                        this.id = keys.getInt(1);
+                     }
+                  }
+                  else {
+                     this.id = rs.getInt(1);
+                     this.writePosition = rs.getBlob(4).length();
+                  }
+               }
+            }
+         }
+         catch (SQLException e) {
+            ActiveMQJournalLogger.LOGGER.error("Error retreiving file record", e);
+            isOpen = false;
+         }
+
+         isCreated = true;
+         isOpen = true;
+      }
+   }
+
+   @Override
+   public void open(int maxIO, boolean useExecutor) throws Exception {
+      open();
+   }
+
+   @Override
+   public boolean fits(int size) {
+      return writePosition + size <= maxSize;
+   }
+
+   @Override
+   public int getAlignment() throws Exception {
+      return 0;
+   }
+
+   @Override
+   public int calculateBlockStart(int position) throws Exception {
+      return 0;
+   }
+
+   @Override
+   public String getFileName() {
+      return filename;
+   }
+
+   @Override
+   public void fill(int size) throws Exception {
+      // Do nothing
+   }
+
+   @Override
+   public void delete() throws IOException, InterruptedException, ActiveMQException {
+      try {
+         if (isCreated) {
+            deleteFile.setInt(1, id);
+            deleteFile.executeUpdate();
+         }
+      }
+      catch (SQLException e) {
+         throw new IOException(e);
+      }
+   }
+
+   private synchronized int internalWrite(byte[] data, IOCallback callback) {
+      try {
+         synchronized (writeLock) {
+            int noBytes = data.length;
+            appendToFile.setBytes(1, data);
+            appendToFile.setInt(2, id);
+            int result = appendToFile.executeUpdate();
+            if (result < 1)
+               throw new ActiveMQException("No record found for file id: " + id);
+            seek(noBytes);
+            if (callback != null)
+               callback.done();
+            return noBytes;
+         }
+      }
+      catch (Exception e) {
+         e.printStackTrace();
+         if (callback != null)
+            callback.onError(-1, e.getMessage());
+      }
+      return -1;
+   }
+
+   public synchronized int internalWrite(ActiveMQBuffer buffer, IOCallback callback) {
+      byte[] data = new byte[buffer.readableBytes()];
+      buffer.readBytes(data);
+      return internalWrite(data, callback);
+   }
+
+   private synchronized int internalWrite(ByteBuffer buffer, IOCallback callback) {
+      return internalWrite(buffer.array(), callback);
+   }
+
+   public void scheduleWrite(final ActiveMQBuffer bytes, final IOCallback callback) {
+      executor.execute(new Runnable() {
+         @Override
+         public void run() {
+            internalWrite(bytes, callback);
+         }
+      });
+   }
+
+   public void scheduleWrite(final ByteBuffer bytes, final IOCallback callback) {
+      executor.execute(new Runnable() {
+         @Override
+         public void run() {
+            internalWrite(bytes, callback);
+         }
+      });
+   }
+
+   synchronized void seek(long noBytes) {
+      writePosition += noBytes;
+   }
+
+   @Override
+   public void write(ActiveMQBuffer bytes, boolean sync, IOCallback callback) throws Exception
{
+      // We ignore sync since we schedule writes straight away.
+      scheduleWrite(bytes, callback);
+   }
+
+   @Override
+   public void write(ActiveMQBuffer bytes, boolean sync) throws Exception {
+      write(bytes, sync, null);
+   }
+
+   @Override
+   public void write(EncodingSupport bytes, boolean sync, IOCallback callback) throws Exception
{
+      ActiveMQBuffer data = ActiveMQBuffers.fixedBuffer(bytes.getEncodeSize());
+      bytes.encode(data);
+      scheduleWrite(data, callback);
+   }
+
+   @Override
+   public void write(EncodingSupport bytes, boolean sync) throws Exception {
+      write(bytes, sync, null);
+   }
+
+   @Override
+   public void writeDirect(ByteBuffer bytes, boolean sync, IOCallback callback) {
+      if (callback == null) {
+         SimpleWaitIOCallback waitIOCallback = new SimpleWaitIOCallback();
+         try {
+            scheduleWrite(bytes, waitIOCallback);
+            waitIOCallback.waitCompletion();
+         }
+         catch (Exception e) {
+            waitIOCallback.onError(-1, e.getMessage());
+         }
+      }
+      else {
+         scheduleWrite(bytes, callback);
+      }
+
+   }
+
+   @Override
+   public void writeDirect(ByteBuffer bytes, boolean sync) throws Exception {
+      writeDirect(bytes, sync, null);
+      // Are we meant to block here?
+   }
+
+   @Override
+   public synchronized int read(ByteBuffer bytes, IOCallback callback) throws SQLException
{
+      synchronized (writeLock) {
+         readFile.setInt(1, id);
+         try (ResultSet rs = readFile.executeQuery()) {
+            if (rs.next()) {
+               Blob blob = rs.getBlob(1);
+
+               long bytesRemaining = blob.length() - readPosition;
+               byte[] data;
+               if (bytesRemaining > bytes.remaining()) {
+                  // First index into blob is 1 (not 0)
+                  data = blob.getBytes(readPosition + 1, bytes.remaining());
+               }
+               else {
+                  // First index into blob is 1 (not 0)
+                  data = blob.getBytes(readPosition + 1, (int) bytesRemaining);
+               }
+
+               bytes.put(data);
+               readPosition += data.length;
+               if (callback != null)
+                  callback.done();
+
+               return data.length;
+            }
+            return 0;
+         }
+         catch (Exception e) {
+            if (callback != null)
+               callback.onError(-1, e.getMessage());
+            return 0;
+         }
+      }
+   }
+
+   @Override
+   public int read(ByteBuffer bytes) throws Exception {
+      return read(bytes, null);
+   }
+
+   @Override
+   public void position(long pos) throws IOException {
+      readPosition = pos;
+   }
+
+   @Override
+   public long position() {
+      return readPosition;
+   }
+
+   @Override
+   public synchronized void close() throws Exception {
+      isOpen = false;
+   }
+
+   @Override
+   public void sync() throws IOException {
+      // (mtaylor) We always write straight away, so we don't need to do anything here.
+      // (mtaylor) Is this meant to be blocking?
+   }
+
+   @Override
+   public long size() throws Exception {
+      return writePosition;
+   }
+
+   @Override
+   public void renameTo(String newFileName) throws Exception {
+      renameFile.setString(1, newFileName);
+      renameFile.setInt(2, id);
+      renameFile.executeUpdate();
+   }
+
+   @Override
+   public SequentialFile cloneFile() {
+      try {
+         JDBCSequentialFile clone = new JDBCSequentialFile(fileFactory, filename, sqlProvider,
executor, writeLock);
+         return clone;
+      }
+      catch (Exception e) {
+         logger.error("Error cloning file: " + filename, e);
+         return null;
+      }
+   }
+
+   @Override
+   public void copyTo(SequentialFile cloneFile) throws Exception {
+      JDBCSequentialFile clone = (JDBCSequentialFile) cloneFile;
+      clone.open();
+
+      copyFileRecord.setInt(1, id);
+      copyFileRecord.setInt(2, clone.getId());
+      copyFileRecord.executeUpdate();
+   }
+
+   public int getId() {
+      return id;
+   }
+
+   public void setId(int id) {
+      this.id = id;
+   }
+
+   public String getFilename() {
+      return filename;
+   }
+
+   public String getExtension() {
+      return extension;
+   }
+
+   // Only Used by Journal, no need to implement.
+   @Override
+   public void setTimedBuffer(TimedBuffer buffer) {
+   }
+
+   // Only Used by replication, no need to implement.
+   @Override
+   public File getJavaFile() {
+      return null;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c9b95343/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
new file mode 100644
index 0000000..4231907
--- /dev/null
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.jdbc.store.file;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executor;
+
+import org.apache.activemq.artemis.core.io.SequentialFile;
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
+import org.apache.activemq.artemis.core.server.ActiveMQComponent;
+import org.apache.activemq.artemis.jdbc.store.JDBCUtils;
+import org.apache.activemq.artemis.jdbc.store.file.sql.SQLProvider;
+import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
+
+public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveMQComponent
{
+
+   private Connection connection;
+
+   private String connectionUrl;
+
+   private final Driver driver;
+
+   private boolean started;
+
+   private final String tableName;
+
+   private List<JDBCSequentialFile> files;
+
+   private PreparedStatement selectFileNamesByExtension;
+
+   private Executor executor;
+
+   private SQLProvider sqlProvider;
+
+   private Map<String, Object> fileLocks = new HashMap<>();
+
+   public JDBCSequentialFileFactory(final String connectionUrl,
+                                    final String tableName,
+                                    final String className,
+                                    Executor executor) throws Exception {
+      this.connectionUrl = connectionUrl;
+      this.executor = executor;
+      this.tableName = tableName.toUpperCase();
+
+      files = new ArrayList<>();
+      sqlProvider = JDBCUtils.getSQLProvider(JDBCUtils.getDriver(className).getClass().getCanonicalName(),
tableName);
+      driver = JDBCUtils.getDriver(className);
+   }
+
+   public Connection getConnection() {
+      return connection;
+   }
+
+   @Override
+   public SequentialFile createSequentialFile(String fileName) {
+      try {
+         fileLocks.putIfAbsent(fileName, new Object());
+         JDBCSequentialFile file = new JDBCSequentialFile(this, fileName, sqlProvider, executor,
fileLocks.get(fileName));
+         files.add(file);
+         return file;
+      }
+      catch (Exception e) {
+         ActiveMQJournalLogger.LOGGER.error("Could not create file", e);
+      }
+      return null;
+   }
+
+   @Override
+   public int getMaxIO() {
+      return 1;
+   }
+
+   @Override
+   public List<String> listFiles(String extension) throws Exception {
+      List<String> fileNames = new ArrayList<>();
+
+      selectFileNamesByExtension.setString(1, extension);
+      try (ResultSet rs = selectFileNamesByExtension.executeQuery()) {
+         while (rs.next()) {
+            fileNames.add(rs.getString(1));
+         }
+      }
+      return fileNames;
+   }
+
+   @Override
+   public boolean isSupportsCallbacks() {
+      return true;
+   }
+
+   @Override
+   public void onIOError(Exception exception, String message, SequentialFile file) {
+   }
+
+   @Override
+   public ByteBuffer allocateDirectBuffer(final int size) {
+      return NIOSequentialFileFactory.allocateDirectByteBuffer(size);
+   }
+
+   @Override
+   public void releaseDirectBuffer(ByteBuffer buffer) {
+      // nothing we can do on this case. we can just have good faith on GC
+   }
+
+   @Override
+   public ByteBuffer newBuffer(final int size) {
+      return ByteBuffer.allocate(size);
+   }
+
+   @Override
+   public void clearBuffer(final ByteBuffer buffer) {
+      final int limit = buffer.limit();
+      buffer.rewind();
+
+      for (int i = 0; i < limit; i++) {
+         buffer.put((byte) 0);
+      }
+
+      buffer.rewind();
+   }
+
+   @Override
+   public ByteBuffer wrapBuffer(final byte[] bytes) {
+      return ByteBuffer.wrap(bytes);
+   }
+
+   @Override
+   public int getAlignment() {
+      return 1;
+   }
+
+   @Override
+   public int calculateBlockSize(final int bytes) {
+      return bytes;
+   }
+
+   @Override
+   public void deactivateBuffer() {
+   }
+
+   @Override
+   public void releaseBuffer(final ByteBuffer buffer) {
+   }
+
+   @Override
+   public void activateBuffer(SequentialFile file) {
+
+   }
+
+   @Override
+   public File getDirectory() {
+      return null;
+   }
+
+   @Override
+   public synchronized void start() {
+      try {
+         if (!started) {
+            connection = driver.connect(connectionUrl, new Properties());
+            JDBCUtils.createTableIfNotExists(connection, tableName, sqlProvider.getCreateFileTableSQL());
+            selectFileNamesByExtension = connection.prepareStatement(sqlProvider.getSelectFileNamesByExtensionSQL());
+            started = true;
+         }
+      }
+      catch (SQLException e) {
+         ActiveMQJournalLogger.LOGGER.error("Could not start file factory, unable to connect
to database");
+         started = false;
+      }
+   }
+
+   @Override
+   public synchronized void stop() {
+      try {
+         if (false)
+            connection.close();
+      }
+      catch (SQLException e) {
+         ActiveMQJournalLogger.LOGGER.error("Error stopping file factory, unable to close
db connection");
+      }
+      started = false;
+   }
+
+   @Override
+   public boolean isStarted() {
+      return started;
+   }
+
+   @Override
+   public void createDirs() throws Exception {
+   }
+
+   @Override
+   public void flush() {
+
+   }
+
+   public synchronized void destroy() throws SQLException {
+      Statement statement = connection.createStatement();
+      statement.executeUpdate(sqlProvider.getDropFileTableSQL());
+      stop();
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c9b95343/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/DerbySQLProvider.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/DerbySQLProvider.java
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/DerbySQLProvider.java
new file mode 100644
index 0000000..c14036e
--- /dev/null
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/DerbySQLProvider.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.jdbc.store.file.sql;
+
+public class DerbySQLProvider extends GenericSQLProvider {
+
+   // Derby max blob size = 2G
+   private static final int MAX_BLOB_SIZE = 2147483647;
+
+   private final String createFileTableSQL;
+
+   private final String appendToFileSQL;
+
+   public DerbySQLProvider(String tableName) {
+      super(tableName);
+
+      createFileTableSQL = "CREATE TABLE " + tableName +
+         "(ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1),"
+
+         "FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA BLOB, PRIMARY KEY(ID))";
+
+      appendToFileSQL = "UPDATE " + tableName + " SET DATA = DATA || ? WHERE ID=?";
+   }
+
+   @Override
+   public int getMaxBlobSize() {
+      return MAX_BLOB_SIZE;
+   }
+
+   @Override
+   public String getCreateFileTableSQL() {
+      return createFileTableSQL;
+   }
+
+   @Override
+   public String getAppendToFileSQL() {
+      return appendToFileSQL;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c9b95343/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/GenericSQLProvider.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/GenericSQLProvider.java
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/GenericSQLProvider.java
new file mode 100644
index 0000000..c95edb3
--- /dev/null
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/GenericSQLProvider.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.jdbc.store.file.sql;
+
+public class GenericSQLProvider implements SQLProvider {
+
+   // Default to lowest (MYSQL = 64k)
+   private static final int MAX_BLOB_SIZE = 64512;
+
+   private final String tableName;
+
+   private final String createFileTableSQL;
+
+   private final String insertFileSQL;
+
+   private final String selectFileNamesByExtensionSQL;
+
+   private final String selectIdByFileNameSQL;
+
+   private final String appendToFileSQL;
+
+   private final String readFileSQL;
+
+   private final String deleteFileSQL;
+
+   private final String updateFileNameByIdSQL;
+
+   private final String copyFileRecordByIdSQL;
+
+   private final String cloneFileRecordSQL;
+
+   private final String dropFileTableSQL;
+
+   public GenericSQLProvider(String tableName) {
+      this.tableName = tableName;
+
+      createFileTableSQL = "CREATE TABLE " + tableName +
+         "(ID INT AUTO_INCREMENT, FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA BLOB,
PRIMARY KEY(ID))";
+
+      insertFileSQL = "INSERT INTO " + tableName +
+         " (FILENAME, EXTENSION, DATA) VALUES (?,?,?)";
+
+      selectFileNamesByExtensionSQL = "SELECT FILENAME, ID FROM " + tableName + " WHERE EXTENSION=?";
+
+      selectIdByFileNameSQL = "SELECT ID, FILENAME, EXTENSION, DATA FROM " + tableName +
" WHERE fileName=?";
+
+      appendToFileSQL = "UPDATE " + tableName + " SET DATA = CONCAT(DATA, ?) WHERE ID=?";
+
+      readFileSQL = "SELECT DATA FROM " + tableName + " WHERE ID=?";
+
+      deleteFileSQL = "DELETE FROM " + tableName + " WHERE ID=?";
+
+      updateFileNameByIdSQL = "UPDATE " + tableName + " SET FILENAME=? WHERE ID=?";
+
+      cloneFileRecordSQL = "INSERT INTO " + tableName + "(FILENAME, EXTENSION, DATA) " +
+         "(SELECT FILENAME, EXTENSION, DATA FROM " + tableName + " WHERE ID=?)";
+
+      copyFileRecordByIdSQL = "UPDATE " + tableName + " SET DATA = (SELECT DATA FROM " +
tableName + " WHERE ID=?) WHERE ID=?";
+
+      dropFileTableSQL = "DROP TABLE " + tableName;
+   }
+
+   @Override
+   public int getMaxBlobSize() {
+      return MAX_BLOB_SIZE;
+   }
+
+   @Override
+   public String getTableName() {
+      return tableName;
+   }
+
+   @Override
+   public String getCreateFileTableSQL() {
+      return createFileTableSQL;
+   }
+
+   @Override
+   public String getInsertFileSQL() {
+      return insertFileSQL;
+   }
+
+   @Override
+   public String getSelectFileByFileName() {
+      return selectIdByFileNameSQL;
+   }
+
+   @Override
+   public String getSelectFileNamesByExtensionSQL() {
+      return selectFileNamesByExtensionSQL;
+   }
+
+   @Override
+   public String getAppendToFileSQL() {
+      return appendToFileSQL;
+   }
+
+   @Override
+   public String getReadFileSQL() {
+      return readFileSQL;
+   }
+
+   @Override
+   public String getDeleteFileSQL() {
+      return deleteFileSQL;
+   }
+
+   @Override
+   public String getUpdateFileNameByIdSQL() {
+      return updateFileNameByIdSQL;
+   }
+
+   @Override
+   public String getCopyFileRecordByIdSQL() {
+      return copyFileRecordByIdSQL;
+   }
+
+   @Override
+   public String getCloneFileRecordByIdSQL() {
+      return cloneFileRecordSQL;
+   }
+
+   @Override
+   public String getDropFileTableSQL() {
+      return dropFileTableSQL;
+   }
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c9b95343/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/SQLProvider.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/SQLProvider.java
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/SQLProvider.java
new file mode 100644
index 0000000..e9fe36c
--- /dev/null
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/SQLProvider.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.jdbc.store.file.sql;
+
+public interface SQLProvider {
+
+   int getMaxBlobSize();
+
+   String getTableName();
+
+   String getCreateFileTableSQL();
+
+   String getInsertFileSQL();
+
+   String getSelectFileNamesByExtensionSQL();
+
+   String getSelectFileByFileName();
+
+   String getAppendToFileSQL();
+
+   String getReadFileSQL();
+
+   String getDeleteFileSQL();
+
+   String getUpdateFileNameByIdSQL();
+
+   String getCopyFileRecordByIdSQL();
+
+   String getDropFileTableSQL();
+
+   String getCloneFileRecordByIdSQL();
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c9b95343/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
index 73a8602..f253167 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
@@ -229,12 +229,13 @@ public class JDBCJournalImpl implements Journal {
 
    /* We store Transaction reference in memory (once all records associated with a Tranascation
are Deleted,
       we remove the Tx Records (i.e. PREPARE, COMMIT). */
-   private void cleanupTxRecords(List<Long> deletedRecords, List<Long> committedTx)
throws SQLException {
+   private synchronized void cleanupTxRecords(List<Long> deletedRecords, List<Long>
committedTx) throws SQLException {
 
       List<RecordInfo> iterableCopy;
       List<TransactionHolder> iterableCopyTx = new ArrayList<>();
       iterableCopyTx.addAll(transactions.values());
 
+
       for (Long txId : committedTx) {
          transactions.get(txId).committed = true;
       }
@@ -319,7 +320,7 @@ public class JDBCJournalImpl implements Journal {
       if (callback != null) callback.waitCompletion();
    }
 
-   private void addTxRecord(JDBCJournalRecord record) {
+   private synchronized void addTxRecord(JDBCJournalRecord record) {
       TransactionHolder txHolder = transactions.get(record.getTxId());
       if (txHolder == null) {
          txHolder = new TransactionHolder(record.getTxId());
@@ -341,7 +342,7 @@ public class JDBCJournalImpl implements Journal {
       }
    }
 
-   private void removeTxRecord(JDBCJournalRecord record) {
+   private synchronized void removeTxRecord(JDBCJournalRecord record) {
       TransactionHolder txHolder = transactions.get(record.getTxId());
 
       // We actually only need the record ID in this instance.

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c9b95343/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java
b/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java
new file mode 100644
index 0000000..554f36b
--- /dev/null
+++ b/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.jdbc.file;
+
+import java.nio.ByteBuffer;
+import java.sql.SQLException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.io.SequentialFile;
+import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFile;
+import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactory;
+import org.apache.derby.jdbc.EmbeddedDriver;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class JDBCSequentialFileFactoryTest {
+
+   private static String connectionUrl = "jdbc:derby:target/data;create=true";
+
+   private static String tableName = "FILES";
+
+   private static String className = EmbeddedDriver.class.getCanonicalName();
+
+   private JDBCSequentialFileFactory factory;
+
+   @Before
+   public void setup() throws Exception {
+      Executor executor = Executors.newSingleThreadExecutor();
+
+      factory = new JDBCSequentialFileFactory(connectionUrl, tableName, className, executor);
+      factory.start();
+   }
+
+   @After
+   public void tearDown() throws SQLException {
+      factory.destroy();
+   }
+
+   @Test
+   public void testJDBCFileFactoryStarted() throws Exception {
+      assertTrue(factory.isStarted());
+   }
+
+   @Test
+   public void testCreateFiles() throws Exception {
+      int noFiles = 100;
+      Set<String> fileNames = new HashSet<String>();
+      for (int i = 0; i < noFiles; i++) {
+         String fileName = UUID.randomUUID().toString() + ".txt";
+         fileNames.add(fileName);
+         SequentialFile file = factory.createSequentialFile(fileName);
+         // We create files on Open
+         file.open();
+      }
+
+      List<String> queryFileNames = factory.listFiles("txt");
+      assertTrue(queryFileNames.containsAll(fileNames));
+   }
+
+   @Test
+   public void testAsyncAppendToFile() throws Exception {
+
+      JDBCSequentialFile file = (JDBCSequentialFile) factory.createSequentialFile("test.txt");
+      file.open();
+
+      // Create buffer and fill with test data
+      int bufferSize = 1024;
+      ActiveMQBuffer src = ActiveMQBuffers.fixedBuffer(bufferSize);
+      for (int i = 0; i < bufferSize; i++) {
+         src.writeByte((byte) 1);
+      }
+
+      IOCallbackCountdown callback = new IOCallbackCountdown(1);
+      file.internalWrite(src, callback);
+
+      callback.assertEmpty(5);
+      checkData(file, src);
+   }
+
+   @Test
+   public void testCopyFile() throws Exception {
+      JDBCSequentialFile file = (JDBCSequentialFile) factory.createSequentialFile("test.txt");
+      file.open();
+
+      // Create buffer and fill with test data
+      int bufferSize = 1024;
+      ActiveMQBuffer src = ActiveMQBuffers.fixedBuffer(bufferSize);
+      for (int i = 0; i < bufferSize; i++) {
+         src.writeByte((byte) 5);
+      }
+
+      IOCallbackCountdown callback = new IOCallbackCountdown(1);
+      file.internalWrite(src, callback);
+
+      JDBCSequentialFile copy = (JDBCSequentialFile) factory.createSequentialFile("copy.txt");
+      file.copyTo(copy);
+
+      checkData(copy, src);
+      checkData(file, src);
+   }
+
+   @Test
+   public void testCloneFile() throws Exception {
+      JDBCSequentialFile file = (JDBCSequentialFile) factory.createSequentialFile("test.txt");
+      file.open();
+
+      // Create buffer and fill with test data
+      int bufferSize = 1024;
+      ActiveMQBuffer src = ActiveMQBuffers.fixedBuffer(bufferSize);
+      for (int i = 0; i < bufferSize; i++) {
+         src.writeByte((byte) 5);
+      }
+
+      IOCallbackCountdown callback = new IOCallbackCountdown(1);
+      file.internalWrite(src, callback);
+
+      JDBCSequentialFile copy = (JDBCSequentialFile) file.cloneFile();
+   }
+
+   private void checkData(JDBCSequentialFile file, ActiveMQBuffer expectedData) throws SQLException
{
+      expectedData.resetReaderIndex();
+
+      byte[] resultingBytes = new byte[expectedData.readableBytes()];
+      ByteBuffer byteBuffer = ByteBuffer.allocate(expectedData.readableBytes());
+
+      file.read(byteBuffer, null);
+      expectedData.getBytes(0, resultingBytes);
+
+      assertArrayEquals(resultingBytes, byteBuffer.array());
+   }
+
+   private class IOCallbackCountdown implements IOCallback {
+
+      private final CountDownLatch countDownLatch;
+
+      public IOCallbackCountdown(int size) {
+         this.countDownLatch = new CountDownLatch(size);
+      }
+
+      @Override
+      public void done() {
+         countDownLatch.countDown();
+      }
+
+      @Override
+      public void onError(int errorCode, String errorMessage) {
+         fail(errorMessage);
+      }
+
+      public void assertEmpty(int timeout) throws InterruptedException {
+         countDownLatch.await(timeout, TimeUnit.SECONDS);
+         assertEquals(countDownLatch.getCount(), 0);
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c9b95343/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java
index 67c3038..a5884b9 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java
@@ -65,18 +65,7 @@ public class NIOSequentialFileFactory extends AbstractSequentialFileFactory
{
       super(journalDir, buffered, bufferSize, bufferTimeout, maxIO, logRates, listener);
    }
 
-   @Override
-   public SequentialFile createSequentialFile(final String fileName) {
-      return new NIOSequentialFile(this, journalDir, fileName, maxIO, writeExecutor);
-   }
-
-   @Override
-   public boolean isSupportsCallbacks() {
-      return timedBuffer != null;
-   }
-
-   @Override
-   public ByteBuffer allocateDirectBuffer(final int size) {
+   public static ByteBuffer allocateDirectByteBuffer(final int size) {
       // Using direct buffer, as described on https://jira.jboss.org/browse/HORNETQ-467
       ByteBuffer buffer2 = null;
       try {
@@ -105,6 +94,21 @@ public class NIOSequentialFileFactory extends AbstractSequentialFileFactory
{
    }
 
    @Override
+   public SequentialFile createSequentialFile(final String fileName) {
+      return new NIOSequentialFile(this, journalDir, fileName, maxIO, writeExecutor);
+   }
+
+   @Override
+   public boolean isSupportsCallbacks() {
+      return timedBuffer != null;
+   }
+
+   @Override
+   public ByteBuffer allocateDirectBuffer(final int size) {
+      return NIOSequentialFileFactory.allocateDirectByteBuffer(size);
+   }
+
+   @Override
    public void releaseDirectBuffer(ByteBuffer buffer) {
       // nothing we can do on this case. we can just have good faith on GC
    }


Mime
View raw message