activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-1653 Allow database tables to be created externally
Date Tue, 17 Apr 2018 14:41:57 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 8f4042c40 -> 95e9e6e2b


ARTEMIS-1653 Allow database tables to be created externally

The previous commit about this feature wasn't using the row count query
ResultSet.
The mechanics has been changed to allow the row count query
to fail, because DROP and CREATE aren't transactional and immediate
in most DBMS.
It includes a test that stress its mechanics if used with DBMS like
DB2 10.5 and Oracle 12c.
Additional checks and logs have been added to trace each steps.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c7651853
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c7651853
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c7651853

Branch: refs/heads/master
Commit: c7651853cdb291dfa3bd2906e1e082fd06cff612
Parents: 8f4042c
Author: Francesco Nigro <nigro.fra@gmail.com>
Authored: Tue Apr 3 10:11:04 2018 +0200
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Tue Apr 17 10:41:14 2018 -0400

----------------------------------------------------------------------
 .../jdbc/store/drivers/AbstractJDBCDriver.java  |  59 ++++++++--
 .../impl/jdbc/JdbcSharedStateManagerTest.java   | 108 +++++++++++++++++++
 .../core/server/impl/jdbc/TestJDBCDriver.java   |  29 ++++-
 .../jdbc/store/journal/JDBCJournalTest.java     |  55 +++++++---
 4 files changed, 222 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c7651853/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java
index 62c9501..e421a3b 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java
@@ -182,25 +182,62 @@ public abstract class AbstractJDBCDriver {
       logger.tracef("Validating if table %s didn't exist before creating", tableName);
       try {
          connection.setAutoCommit(false);
+         final boolean tableExists;
          try (ResultSet rs = connection.getMetaData().getTables(null, null, tableName, null))
{
-            if (rs != null && !rs.next()) {
+            if ((rs == null) || (rs != null && !rs.next())) {
+               tableExists = false;
                if (logger.isTraceEnabled()) {
                   logger.tracef("Table %s did not exist, creating it with SQL=%s", tableName,
Arrays.toString(sqls));
                }
-               final SQLWarning sqlWarning = rs.getWarnings();
-               if (sqlWarning != null) {
-                  logger.warn(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), sqlWarning));
+               if (rs != null) {
+                  final SQLWarning sqlWarning = rs.getWarnings();
+                  if (sqlWarning != null) {
+                     logger.warn(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(),
sqlWarning));
+                  }
                }
             } else {
-               try (Statement statement = connection.createStatement();
-                     ResultSet cntRs = statement.executeQuery(sqlProvider.getCountJournalRecordsSQL()))
{
-                  if (rs.next() && rs.getInt(1) > 0) {
-                     logger.tracef("Table %s did exist but is not empty. Skipping initialization.",
tableName);
+               tableExists = true;
+            }
+         }
+         if (tableExists) {
+            logger.tracef("Validating if the existing table %s is initialized or not", tableName);
+            try (Statement statement = connection.createStatement();
+                 ResultSet cntRs = statement.executeQuery(sqlProvider.getCountJournalRecordsSQL()))
{
+               logger.tracef("Validation of the existing table %s initialization is started",
tableName);
+               int rows;
+               if (cntRs.next() && (rows = cntRs.getInt(1)) > 0) {
+                  logger.tracef("Table %s did exist but is not empty. Skipping initialization.
Found %d rows.", tableName, rows);
+                  if (logger.isDebugEnabled()) {
+                     final long expectedRows = Stream.of(sqls).map(String::toUpperCase).filter(sql
-> sql.contains("INSERT INTO")).count();
+                     if (rows < expectedRows) {
+                        logger.debug("Table " + tableName + " was expected to contain " +
expectedRows + " rows while it has " + rows + " rows.");
+                     }
+                  }
+                  connection.commit();
+                  return;
+               } else {
+                  sqls = Stream.of(sqls).filter(sql -> {
+                     final String upperCaseSql = sql.toUpperCase();
+                     return !(upperCaseSql.contains("CREATE TABLE") || upperCaseSql.contains("CREATE
INDEX"));
+                  }).toArray(String[]::new);
+                  if (sqls.length > 0) {
+                     logger.tracef("Table %s did exist but is empty. Starting initialization.",
tableName);
                   } else {
-                     sqls = Arrays.copyOfRange(sqls, 1, sqls.length);
+                     logger.tracef("Table %s did exist but is empty. Initialization completed:
no initialization statements left.", tableName);
                   }
                }
+            } catch (SQLException e) {
+               logger.warn(JDBCUtils.appendSQLExceptionDetails(new StringBuilder("Can't verify
the initialization of table ").append(tableName).append(" due to:"), e, sqlProvider.getCountJournalRecordsSQL()));
+               try {
+                  connection.rollback();
+               } catch (SQLException rollbackEx) {
+                  logger.debug("Rollback failed while validating initialization of a table",
rollbackEx);
+               }
+               connection.setAutoCommit(false);
+               logger.tracef("Table %s seems to exist, but we can't verify the initialization.
Keep trying to create and initialize.", tableName);
             }
+         }
+         if (sqls.length > 0) {
             try (Statement statement = connection.createStatement()) {
                for (String sql : sqls) {
                   statement.executeUpdate(sql);
@@ -210,9 +247,9 @@ public abstract class AbstractJDBCDriver {
                   }
                }
             }
-         }
 
-         connection.commit();
+            connection.commit();
+         }
       } catch (SQLException e) {
          final String sqlStatements = Stream.of(sqls).collect(Collectors.joining("\n"));
          logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e, sqlStatements));

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c7651853/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManagerTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManagerTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManagerTest.java
new file mode 100644
index 0000000..e7ac316
--- /dev/null
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManagerTest.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server.impl.jdbc;
+
+import java.util.UUID;
+
+import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
+import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils;
+import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.junit.Before;
+import org.junit.Test;
+
+public class JdbcSharedStateManagerTest extends ActiveMQTestBase {
+
+   private DatabaseStorageConfiguration dbConf;
+   private SQLProvider sqlProvider;
+
+   @Before
+   public void configure() {
+      dbConf = createDefaultDatabaseStorageConfiguration();
+      sqlProvider = JDBCUtils.getSQLProvider(
+         dbConf.getJdbcDriverClassName(),
+         dbConf.getNodeManagerStoreTableName(),
+         SQLProvider.DatabaseStoreType.NODE_MANAGER);
+   }
+
+   private TestJDBCDriver createFakeDriver(boolean initializeTable) {
+      return TestJDBCDriver.usingConnectionUrl(
+         dbConf.getJdbcConnectionUrl(),
+         dbConf.getJdbcDriverClassName(),
+         sqlProvider,
+         initializeTable);
+   }
+
+   private JdbcSharedStateManager createSharedStateManager() {
+      return JdbcSharedStateManager.usingConnectionUrl(
+         UUID.randomUUID().toString(),
+         dbConf.getJdbcLockExpirationMillis(),
+         dbConf.getJdbcMaxAllowedMillisFromDbTime(),
+         dbConf.getJdbcConnectionUrl(),
+         dbConf.getJdbcDriverClassName(),
+         sqlProvider);
+   }
+
+   @Test(timeout = 10000)
+   public void shouldStartIfTableNotExist() throws Exception {
+      final JdbcSharedStateManager sharedStateManager = createSharedStateManager();
+      try {
+         sharedStateManager.destroy();
+      } finally {
+         sharedStateManager.stop();
+      }
+   }
+
+   @Test(timeout = 10000)
+   public void shouldStartIfTableExistEmpty() throws Exception {
+      final TestJDBCDriver fakeDriver = createFakeDriver(false);
+      fakeDriver.start();
+      final JdbcSharedStateManager sharedStateManager = createSharedStateManager();
+      sharedStateManager.stop();
+      try {
+         fakeDriver.destroy();
+      } finally {
+         fakeDriver.stop();
+      }
+   }
+
+   @Test(timeout = 10000)
+   public void shouldStartIfTableExistInitialized() throws Exception {
+      final TestJDBCDriver fakeDriver = createFakeDriver(true);
+      fakeDriver.start();
+      final JdbcSharedStateManager sharedStateManager = createSharedStateManager();
+      sharedStateManager.stop();
+      try {
+         fakeDriver.destroy();
+      } finally {
+         fakeDriver.stop();
+      }
+   }
+
+   @Test(timeout = 10000)
+   public void shouldStartTwoIfTableNotExist() throws Exception {
+      final JdbcSharedStateManager liveSharedStateManager = createSharedStateManager();
+      final JdbcSharedStateManager backupSharedStateManager = createSharedStateManager();
+      backupSharedStateManager.stop();
+      try {
+         liveSharedStateManager.destroy();
+      } finally {
+         liveSharedStateManager.stop();
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c7651853/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/TestJDBCDriver.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/TestJDBCDriver.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/TestJDBCDriver.java
index 52b497a..2df6274 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/TestJDBCDriver.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/TestJDBCDriver.java
@@ -20,21 +20,33 @@ import java.sql.SQLException;
 
 import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver;
 import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
+import org.junit.Assert;
 
 public class TestJDBCDriver extends AbstractJDBCDriver {
 
+   public static TestJDBCDriver usingConnectionUrl(String jdbcConnectionUrl,
+                                                   String jdbcDriverClass,
+                                                   SQLProvider provider) {
+      return usingConnectionUrl(jdbcConnectionUrl, jdbcDriverClass, provider, false);
+   }
 
-   public static TestJDBCDriver usingConnectionUrl(
-         String jdbcConnectionUrl,
-         String jdbcDriverClass,
-         SQLProvider provider) {
-      TestJDBCDriver driver = new TestJDBCDriver();
+   public static TestJDBCDriver usingConnectionUrl(String jdbcConnectionUrl,
+                                                   String jdbcDriverClass,
+                                                   SQLProvider provider,
+                                                   boolean initialize) {
+      TestJDBCDriver driver = new TestJDBCDriver(initialize);
       driver.setSqlProvider(provider);
       driver.setJdbcConnectionUrl(jdbcConnectionUrl);
       driver.setJdbcDriverClass(jdbcDriverClass);
       return driver;
    }
 
+   private boolean initialize;
+
+   private TestJDBCDriver(boolean initialize) {
+      this.initialize = initialize;
+   }
+
    @Override
    protected void prepareStatements() throws SQLException {
    }
@@ -43,7 +55,14 @@ public class TestJDBCDriver extends AbstractJDBCDriver {
    protected void createSchema() throws SQLException {
       try {
          connection.createStatement().execute(sqlProvider.createNodeManagerStoreTableSQL());
+         if (initialize) {
+            connection.createStatement().execute(sqlProvider.createNodeIdSQL());
+            connection.createStatement().execute(sqlProvider.createStateSQL());
+            connection.createStatement().execute(sqlProvider.createLiveLockSQL());
+            connection.createStatement().execute(sqlProvider.createBackupLockSQL());
+         }
       } catch (SQLException e) {
+         Assert.fail(e.getMessage());
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c7651853/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java
index 6caae96..1661df9 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.tests.integration.jdbc.store.journal;
 
 import java.sql.DriverManager;
+import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
@@ -26,40 +27,38 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
 import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
 import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.journal.IOCompletion;
 import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
 import org.apache.activemq.artemis.core.journal.RecordInfo;
+import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils;
 import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl;
-import org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider;
 import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.artemis.utils.ThreadLeakCheckRule;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 
-import static org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider.Factory.SQLDialect.DERBY;
-
 public class JDBCJournalTest extends ActiveMQTestBase {
 
    @Rule
    public ThreadLeakCheckRule threadLeakCheckRule = new ThreadLeakCheckRule();
 
-   private static final String JOURNAL_TABLE_NAME = "MESSAGE_JOURNAL";
-
-   private static final String DRIVER_CLASS = "org.apache.derby.jdbc.EmbeddedDriver";
-
    private JDBCJournalImpl journal;
 
-   private String jdbcUrl;
-
    private ScheduledExecutorService scheduledExecutorService;
 
    private ExecutorService executorService;
 
+   private SQLProvider sqlProvider;
+
+   private DatabaseStorageConfiguration dbConf;
+
    @After
    @Override
    public void tearDown() throws Exception {
@@ -77,20 +76,50 @@ public class JDBCJournalTest extends ActiveMQTestBase {
 
    @Before
    public void setup() throws Exception {
+      dbConf = createDefaultDatabaseStorageConfiguration();
+      sqlProvider = JDBCUtils.getSQLProvider(
+         dbConf.getJdbcDriverClassName(),
+         dbConf.getMessageTableName(),
+         SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL);
       scheduledExecutorService = new ScheduledThreadPoolExecutor(5);
       executorService = Executors.newSingleThreadExecutor();
-      jdbcUrl = "jdbc:derby:target/data;create=true";
-      SQLProvider.Factory factory = new PropertySQLProvider.Factory(DERBY);
-      journal = new JDBCJournalImpl(jdbcUrl, DRIVER_CLASS, factory.create(JOURNAL_TABLE_NAME,
SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), scheduledExecutorService, executorService,
new IOCriticalErrorListener() {
+      journal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), dbConf.getJdbcDriverClassName(),
sqlProvider, scheduledExecutorService, executorService, new IOCriticalErrorListener() {
          @Override
          public void onIOException(Throwable code, String message, SequentialFile file) {
 
          }
-      },5);
+      }, 5);
       journal.start();
    }
 
    @Test
+   public void testRestartEmptyJournal() throws SQLException {
+      Assert.assertTrue(journal.isStarted());
+      Assert.assertEquals(0, journal.getNumberOfRecords());
+      journal.stop();
+      journal.start();
+      Assert.assertTrue(journal.isStarted());
+   }
+
+   @Test
+   public void testConcurrentEmptyJournal() throws SQLException {
+      Assert.assertTrue(journal.isStarted());
+      Assert.assertEquals(0, journal.getNumberOfRecords());
+      final JDBCJournalImpl secondJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(),
+                                                                          dbConf.getJdbcDriverClassName(),
+                                                                          sqlProvider, scheduledExecutorService,
+                                                                          executorService,
(code, message, file) -> {
+         Assert.fail(message);
+      }, 5);
+      secondJournal.start();
+      try {
+         Assert.assertTrue(secondJournal.isStarted());
+      } finally {
+         secondJournal.stop();
+      }
+   }
+
+   @Test
    public void testInsertRecords() throws Exception {
       int noRecords = 10;
       for (int i = 0; i < noRecords; i++) {


Mime
View raw message