activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r685988 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/store/jdbc/ main/java/org/apache/activemq/store/jdbc/adapter/ main/resources/META-INF/services/org/apache/activemq/store/jdbc/ test/java/org/apache/activemq/ test...
Date Thu, 14 Aug 2008 18:33:49 GMT
Author: gtully
Date: Thu Aug 14 11:33:48 2008
New Revision: 685988

URL: http://svn.apache.org/viewvc?rev=685988&view=rev
Log:
fix for AMQ-1885, allow jdbc slave broker to out live a db outage

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveTest.java
  (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java
  (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
    activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/apache_derby_embedded_jdbc_driver
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsSendReceiveTestSupport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java?rev=685988&r1=685987&r2=685988&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
Thu Aug 14 11:33:48 2008
@@ -176,6 +176,13 @@
                 LOG.warn("No databaseLocker configured for the JDBC Persistence Adapter");
             } else {
                 service.start();
+                if (lockKeepAlivePeriod > 0) {
+                    getScheduledThreadPoolExecutor().scheduleAtFixedRate(new Runnable() {
+                        public void run() {
+                            databaseLockKeepAlive();
+                        }
+                    }, lockKeepAlivePeriod, lockKeepAlivePeriod, TimeUnit.MILLISECONDS);
+                }
                 if (brokerService != null) {
                     brokerService.getBroker().nowMasterBroker();
                 }
@@ -258,13 +265,6 @@
     public DatabaseLocker getDatabaseLocker() throws IOException {
         if (databaseLocker == null) {
             databaseLocker = createDatabaseLocker();
-            if (lockKeepAlivePeriod > 0) {
-                getScheduledThreadPoolExecutor().scheduleAtFixedRate(new Runnable() {
-                    public void run() {
-                        databaseLockKeepAlive();
-                    }
-                }, lockKeepAlivePeriod, lockKeepAlivePeriod, TimeUnit.MILLISECONDS);
-            }
         }
         return databaseLocker;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?rev=685988&r1=685987&r2=685988&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
Thu Aug 14 11:33:48 2008
@@ -122,6 +122,7 @@
                 // This will fail usually since the tables will be
                 // created already.
                 try {
+                    LOG.debug("Executing SQL: " + dropStatments[i]);
                     s.execute(dropStatments[i]);
                 } catch (SQLException e) {
                     LOG.warn("Could not drop JDBC tables; they may not exist." + " Failure
was: "
@@ -187,7 +188,9 @@
             }
         } finally {
             if (!batchStatments) {
-                s.close();
+                if (s!=null) {
+                    s.close();
+                }
             }
         }
     }

Modified: activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/apache_derby_embedded_jdbc_driver
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/apache_derby_embedded_jdbc_driver?rev=685988&r1=685987&r2=685988&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/apache_derby_embedded_jdbc_driver
(original)
+++ activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/apache_derby_embedded_jdbc_driver
Thu Aug 14 11:33:48 2008
@@ -14,4 +14,4 @@
 ## See the License for the specific language governing permissions and
 ## limitations under the License.
 ## ---------------------------------------------------------------------------
-class=org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter
\ No newline at end of file
+class=org.apache.activemq.store.jdbc.adapter.DB2JDBCAdapter

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsSendReceiveTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsSendReceiveTestSupport.java?rev=685988&r1=685987&r2=685988&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsSendReceiveTestSupport.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsSendReceiveTestSupport.java
Thu Aug 14 11:33:48 2008
@@ -85,7 +85,6 @@
      */
     public void testSendReceive() throws Exception {
         messages.clear();
-
         for (int i = 0; i < data.length; i++) {
             Message message = session.createTextMessage(data[i]);
             message.setStringProperty("stringProperty", data[i]);
@@ -97,7 +96,7 @@
                 }
             }
 
-            producer.send(producerDestination, message);
+            sendToProducer(producer, producerDestination, message);
             messageSent();
         }
 
@@ -106,6 +105,18 @@
     }
 
     /**
+     * Sends a message to a destination using the supplied producer
+     * @param producer
+     * @param producerDestination
+     * @param message
+     * @throws JMSException
+     */
+    protected void sendToProducer(MessageProducer producer,
+            Destination producerDestination, Message message) throws JMSException {
+        producer.send(producerDestination, message);   
+    }
+
+    /**
      * Asserts messages are received.
      * 
      * @throws JMSException

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveTest.java?rev=685988&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveTest.java
Thu Aug 14 11:33:48 2008
@@ -0,0 +1,53 @@
+package org.apache.activemq.broker.ft;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.derby.jdbc.EmbeddedDataSource;
+
+public class DbRestartJDBCQueueMasterSlaveTest extends JDBCQueueMasterSlaveTest {
+    private static final transient Log LOG = LogFactory.getLog(DbRestartJDBCQueueMasterSlaveTest.class);
+    
+    protected void messageSent() throws Exception {    
+        if (++inflightMessageCount == failureCount) {
+            final EmbeddedDataSource ds = getExistingDataSource();
+            ds.setShutdownDatabase("shutdown");
+            LOG.info("DB STOPPED!@!!!!");
+            
+            Thread dbRestartThread = new Thread("db-re-start-thread") {
+                public void run() {
+                    LOG.info("Waiting for master broker to Stop");
+                    master.waitUntilStopped();
+                    ds.setShutdownDatabase("false");
+                    LOG.info("DB RESTARTED!@!!!!");
+                }
+            };
+            dbRestartThread.start();
+        }
+    }
+     
+    protected void sendToProducer(MessageProducer producer,
+            Destination producerDestination, Message message) throws JMSException {
+        {   
+            // do some retries as db failures filter back to the client until broker sees
+            // db lock failure and shuts down
+            boolean sent = false;
+            do {
+                try { 
+                    producer.send(producerDestination, message);
+                    sent = true;
+                } catch (JMSException e) {
+                    LOG.info("Exception on producer send:", e);
+                    try { 
+                        Thread.sleep(2000);
+                    } catch (InterruptedException ignored) {
+                    }
+                }
+            } while(!sent);
+        }
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java?rev=685988&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java
Thu Aug 14 11:33:48 2008
@@ -0,0 +1,62 @@
+package org.apache.activemq.broker.ft;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.jdbc.DataSourceSupport;
+import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+import org.apache.derby.jdbc.EmbeddedDataSource;
+
+public class JDBCQueueMasterSlaveTest extends QueueMasterSlaveTest {
+    protected EmbeddedDataSource sharedDs;
+    protected String MASTER_URL = "tcp://localhost:62001";
+    protected String SLAVE_URL  = "tcp://localhost:62002";
+
+    protected void setUp() throws Exception {
+        // startup db
+        sharedDs = (EmbeddedDataSource) new DataSourceSupport().getDataSource();
+        super.setUp();
+    }
+    
+    protected void createMaster() throws Exception {
+        master = new BrokerService();
+        master.addConnector(MASTER_URL);
+        master.setUseJmx(false);
+        master.setPersistent(true);
+        master.setDeleteAllMessagesOnStartup(true);
+        JDBCPersistenceAdapter persistenceAdapter = new JDBCPersistenceAdapter();
+        persistenceAdapter.setDataSource(getExistingDataSource());
+        persistenceAdapter.setLockKeepAlivePeriod(500);
+        master.setPersistenceAdapter(persistenceAdapter);
+        master.start();
+    }
+
+    protected void createSlave() throws Exception {
+        // use a separate thread as the slave will block waiting for
+        // the exclusive db lock
+        Thread t = new Thread() {
+            public void run() {
+                try {
+                    BrokerService broker = new BrokerService();
+                    broker.addConnector(SLAVE_URL);
+                    // no need for broker.setMasterConnectorURI(masterConnectorURI)
+                    // as the db lock provides the slave/master initialisation
+                    broker.setUseJmx(false);
+                    broker.setPersistent(true);
+                    JDBCPersistenceAdapter persistenceAdapter = new JDBCPersistenceAdapter();
+                    persistenceAdapter.setDataSource(getExistingDataSource());
+                    persistenceAdapter.setCreateTablesOnStartup(false);
+                    broker.setPersistenceAdapter(persistenceAdapter);
+                    broker.start();
+                    slave.set(broker);
+                    slaveStarted.countDown();
+                } catch (Exception e) {
+                    fail("failed to start slave broker, reason:" + e);
+                }
+            }
+        };
+        t.start();
+    }
+
+    protected EmbeddedDataSource getExistingDataSource() throws Exception {
+        return sharedDs;
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java?rev=685988&r1=685987&r2=685988&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java
Thu Aug 14 11:33:48 2008
@@ -82,8 +82,7 @@
     }
 
     protected void messageSent() throws Exception {
-        if (++inflightMessageCount >= failureCount) {
-            inflightMessageCount = 0;
+        if (++inflightMessageCount == failureCount) {
             Thread.sleep(1000);
             LOG.error("MASTER STOPPED!@!!!!");
             master.stop();



Mime
View raw message