activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1086182 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/store/jdbc/ main/java/org/apache/activemq/util/ test/java/org/apache/activemq/ test/java/org/apache/activemq/broker/ft/
Date Mon, 28 Mar 2011 11:00:06 GMT
Author: gtully
Date: Mon Mar 28 11:00:06 2011
New Revision: 1086182

URL: http://svn.apache.org/viewvc?rev=1086182&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-1780 - ActiveMQ broker does not automatically reconnect
if the connection to the database is lost - extend the DefaultIOExceptionHandler http://activemq.apache.org/configurable-ioexception-handling.html
to be aware of sql exceptions and provide a connector stop/resume option. This is now called
in the event of a failure to get a jdbc connection, the default behaviour is to stop as before,
but new iptions to ignore some sql exceptions or to stop/resume connectors are supported which
allow a db restart to be recovered without restarting the broker

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueTest.java
  (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTopicSendReceiveWithTwoConnectionsTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=1086182&r1=1086181&r2=1086182&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
Mon Mar 28 11:00:06 2011
@@ -1639,7 +1639,7 @@ public class BrokerService implements Se
         }
     }
 
-    protected void stopAllConnectors(ServiceStopper stopper) {
+    public void stopAllConnectors(ServiceStopper stopper) {
         for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();)
{
             NetworkConnector connector = iter.next();
             unregisterNetworkConnectorMBean(connector);
@@ -2063,7 +2063,7 @@ public class BrokerService implements Se
      * 
      * @throws Exception
      */
-    protected void startAllConnectors() throws Exception {
+    public void startAllConnectors() throws Exception {
         if (!isSlave()) {
             Set<ActiveMQDestination> durableDestinations = getBroker().getDurableDestinations();
             List<TransportConnector> al = new ArrayList<TransportConnector>();
@@ -2330,12 +2330,22 @@ public class BrokerService implements Se
     public void setPassiveSlave(boolean passiveSlave) {
         this.passiveSlave = passiveSlave;
     }
-    
+
+    /**
+     * override the Default IOException handler, called when persistence adapter
+     * has experiences File or JDBC I/O Exceptions
+     *
+     * @param ioExceptionHandler
+     */
     public void setIoExceptionHandler(IOExceptionHandler ioExceptionHandler) {
-        ioExceptionHandler.setBrokerService(this);
+        configureService(ioExceptionHandler);
         this.ioExceptionHandler = ioExceptionHandler;
     }
 
+    public IOExceptionHandler getIoExceptionHandler() {
+        return ioExceptionHandler;
+    }
+
     /**
      * @return the schedulerSupport
      */

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java?rev=1086182&r1=1086181&r2=1086182&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java
Mon Mar 28 11:00:06 2011
@@ -88,7 +88,7 @@ public class DefaultDatabaseLocker imple
                                     + exceptionHandler.getClass().getCanonicalName()
                                     + " threw this exception: "
                                     + handlerException
-                                    + " while trying to handle this excpetion: "
+                                    + " while trying to handle this exception: "
                                     + e, handlerException);
                         }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java?rev=1086182&r1=1086181&r2=1086182&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
Mon Mar 28 11:00:06 2011
@@ -18,6 +18,7 @@ package org.apache.activemq.store.jdbc;
 
 import java.io.File;
 import java.io.IOException;
+import java.sql.Connection;
 import java.sql.SQLException;
 import java.util.Collections;
 import java.util.Set;
@@ -489,7 +490,7 @@ public class JDBCPersistenceAdapter exte
     }
 
     public TransactionContext getTransactionContext() throws IOException {
-        TransactionContext answer = new TransactionContext(getDataSource());
+        TransactionContext answer = new TransactionContext(this);
         if (transactionIsolation > 0) {
             answer.setTransactionIsolation(transactionIsolation);
         }
@@ -619,7 +620,7 @@ public class JDBCPersistenceAdapter exte
         try {
             brokerService.stop();
         } catch (Exception e) {
-            LOG.warn("Failure occured while stopping broker");
+            LOG.warn("Failure occurred while stopping broker");
         }
     }
 
@@ -642,7 +643,23 @@ public class JDBCPersistenceAdapter exte
     public void setDirectory(File dir) {
     }
 
+    // interesting bit here is proof that DB is ok
     public void checkpoint(boolean sync) throws IOException {
+        // by pass TransactionContext to avoid IO Exception handler
+        Connection connection = null;
+        try {
+            connection = getDataSource().getConnection();
+        } catch (SQLException e) {
+            LOG.debug("Could not get JDBC connection for checkpoint: " + e);
+            throw IOExceptionSupport.create(e);
+        } finally {
+            if (connection != null) {
+                try {
+                    connection.close();
+                } catch (Throwable ignored) {
+                }
+            }
+        }
     }
 
     public long size(){

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java?rev=1086182&r1=1086181&r2=1086182&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java
Mon Mar 28 11:00:06 2011
@@ -38,6 +38,7 @@ public class TransactionContext {
     private static final Logger LOG = LoggerFactory.getLogger(TransactionContext.class);
 
     private final DataSource dataSource;
+    private final JDBCPersistenceAdapter persistenceAdapter;
     private Connection connection;
     private boolean inTx;
     private PreparedStatement addMessageStatement;
@@ -46,8 +47,9 @@ public class TransactionContext {
     // a cheap dirty level that we can live with    
     private int transactionIsolation = Connection.TRANSACTION_READ_UNCOMMITTED;
     
-    public TransactionContext(DataSource dataSource) {
-        this.dataSource = dataSource;
+    public TransactionContext(JDBCPersistenceAdapter persistenceAdapter) throws IOException
{
+        this.persistenceAdapter = persistenceAdapter;
+        this.dataSource = persistenceAdapter.getDataSource();
     }
 
     public Connection getConnection() throws IOException {
@@ -60,7 +62,10 @@ public class TransactionContext {
                 }
             } catch (SQLException e) {
                 JDBCPersistenceAdapter.log("Could not get JDBC connection: ", e);
-                throw IOExceptionSupport.create(e);
+                IOException ioe = IOExceptionSupport.create(e);
+                persistenceAdapter.getBrokerService().handleIOException(ioe);
+                throw ioe;
+
             }
 
             try {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java?rev=1086182&r1=1086181&r2=1086182&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java
Mon Mar 28 11:00:06 2011
@@ -17,26 +17,37 @@
 package org.apache.activemq.util;
 
 import java.io.IOException;
+import java.sql.SQLException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.broker.BrokerService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class DefaultIOExceptionHandler implements IOExceptionHandler {
+/**
+ * @org.apache.xbean.XBean
+ */
+ public class DefaultIOExceptionHandler implements IOExceptionHandler {
 
     private static final Logger LOG = LoggerFactory
             .getLogger(DefaultIOExceptionHandler.class);
     private BrokerService broker;
     private boolean ignoreAllErrors = false;
     private boolean ignoreNoSpaceErrors = true;
+    private boolean ignoreSQLExceptions = true;
+    private boolean stopStartConnectors = false;
     private String noSpaceMessage = "space";
+    private String sqlExceptionMessage = ""; // match all
+    private long resumeCheckSleepPeriod = 5*1000;
+    private AtomicBoolean stopStartInProgress = new AtomicBoolean(false);
 
     public void handle(IOException exception) {
         if (ignoreAllErrors) {
             LOG.info("Ignoring IO exception, " + exception, exception);
             return;
         }
-        
+
         if (ignoreNoSpaceErrors) {
             Throwable cause = exception;
             while (cause != null && cause instanceof IOException) {
@@ -48,13 +59,71 @@ public class DefaultIOExceptionHandler i
             }
         }
 
+        if (ignoreSQLExceptions) {
+            Throwable cause = exception;
+            while (cause != null) {
+                if (cause instanceof SQLException && cause.getMessage().contains(sqlExceptionMessage))
{
+                    LOG.info("Ignoring SQLException, " + exception, cause);
+                    return;
+                }
+                cause = cause.getCause();
+            }
+        }
+
+        if (stopStartConnectors) {
+            if (!stopStartInProgress.compareAndSet(false, true)) {
+                // we are already working on it
+                return;
+            }
+            LOG.info("Initiating stop/restart of broker transport due to IO exception, "
+ exception, exception);
+
+            new Thread("stop transport connectors on IO exception") {
+                public void run() {
+                    try {
+                        ServiceStopper stopper = new ServiceStopper();
+                        broker.stopAllConnectors(stopper);
+                    } catch (Exception e) {
+                        LOG.warn("Failure occurred while stopping broker connectors", e);
+                    }
+                }
+            }.start();
+
+            // resume again
+            new Thread("restart transport connectors post IO exception") {
+                public void run() {
+                    try {
+                        while (isPersistenceAdapterDown()) {
+                            LOG.info("waiting for broker persistence adapter checkpoint to
succeed before restarting transports");
+                            TimeUnit.MILLISECONDS.sleep(resumeCheckSleepPeriod);
+                        }
+                        broker.startAllConnectors();
+                    } catch (Exception e) {
+                        LOG.warn("Failure occurred while restarting broker connectors", e);
+                    } finally {
+                        stopStartInProgress.compareAndSet(true, false);
+                    }
+                }
+
+                private boolean isPersistenceAdapterDown() {
+                    boolean checkpointSuccess = false;
+                    try {
+                        broker.getPersistenceAdapter().checkpoint(true);
+                        checkpointSuccess = true;
+                    } catch (Throwable ignored) {}
+                    return !checkpointSuccess;
+                }
+            }.start();
+
+            return;
+        }
+
         LOG.info("Stopping the broker due to IO exception, " + exception, exception);
-        new Thread() {
+        new Thread("Stopping the broker due to IO exception") {
             public void run() {
                 try {
                     broker.stop();
                 } catch (Exception e) {
-                    LOG.warn("Failure occured while stopping broker", e);
+                    LOG.warn("Failure occurred while stopping broker", e);
                 }
             }
         }.start();
@@ -88,4 +157,35 @@ public class DefaultIOExceptionHandler i
         this.noSpaceMessage = noSpaceMessage;
     }
 
+    public boolean isIgnoreSQLExceptions() {
+        return ignoreSQLExceptions;
+    }
+
+    public void setIgnoreSQLExceptions(boolean ignoreSQLExceptions) {
+        this.ignoreSQLExceptions = ignoreSQLExceptions;
+    }
+
+    public String getSqlExceptionMessage() {
+        return sqlExceptionMessage;
+    }
+
+    public void setSqlExceptionMessage(String sqlExceptionMessage) {
+        this.sqlExceptionMessage = sqlExceptionMessage;
+    }
+
+    public boolean isStopStartConnectors() {
+        return stopStartConnectors;
+    }
+
+    public void setStopStartConnectors(boolean stopStartConnectors) {
+        this.stopStartConnectors = stopStartConnectors;
+    }
+
+    public long getResumeCheckSleepPeriod() {
+        return resumeCheckSleepPeriod;
+    }
+
+    public void setResumeCheckSleepPeriod(long resumeCheckSleepPeriod) {
+        this.resumeCheckSleepPeriod = resumeCheckSleepPeriod;
+    }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTopicSendReceiveWithTwoConnectionsTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTopicSendReceiveWithTwoConnectionsTest.java?rev=1086182&r1=1086181&r2=1086182&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTopicSendReceiveWithTwoConnectionsTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTopicSendReceiveWithTwoConnectionsTest.java
Mon Mar 28 11:00:06 2011
@@ -49,8 +49,8 @@ public class JmsTopicSendReceiveWithTwoC
         LOG.info("Created sendConnection: " + sendConnection);
         LOG.info("Created receiveConnection: " + receiveConnection);
 
-        session = sendConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        receiveSession = receiveConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session = createSendSession(sendConnection);
+        receiveSession = createReceiveSession(receiveConnection);
 
         LOG.info("Created sendSession: " + session);
         LOG.info("Created receiveSession: " + receiveSession);
@@ -80,6 +80,14 @@ public class JmsTopicSendReceiveWithTwoC
         LOG.info("Started connections");
     }
 
+    protected Session createReceiveSession(Connection receiveConnection) throws Exception
{
+        return receiveConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+    }
+
+    protected Session createSendSession(Connection sendConnection) throws Exception {
+        return sendConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+    }
+
     protected Connection createReceiveConnection() throws Exception {
         return createConnection();
     }

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueTest.java?rev=1086182&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueTest.java
Mon Mar 28 11:00:06 2011
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.ft;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.JmsTopicSendReceiveWithTwoConnectionsTest;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.jdbc.DataSourceSupport;
+import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+import org.apache.activemq.util.DefaultIOExceptionHandler;
+import org.apache.derby.jdbc.EmbeddedDataSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DbRestartJDBCQueueTest extends JmsTopicSendReceiveWithTwoConnectionsTest implements
ExceptionListener {
+    private static final transient Logger LOG = LoggerFactory.getLogger(DbRestartJDBCQueueTest.class);
+
+    public boolean transactedSends = false;
+    public int failureCount = 25;  // or 20 for even tx batch boundary
+
+    int inflightMessageCount = 0;
+    EmbeddedDataSource sharedDs;
+    BrokerService broker;
+    final CountDownLatch restartDBLatch = new CountDownLatch(1);
+
+    protected void setUp() throws Exception {
+        setAutoFail(true);
+        topic = false;
+        verbose = true;
+        // startup db
+        sharedDs = (EmbeddedDataSource) new DataSourceSupport().getDataSource();
+
+        broker = new BrokerService();
+
+        DefaultIOExceptionHandler handler = new DefaultIOExceptionHandler();
+        handler.setIgnoreSQLExceptions(false);
+        handler.setStopStartConnectors(true);
+        broker.setIoExceptionHandler(handler);
+        broker.addConnector("tcp://localhost:0");
+        broker.setUseJmx(false);
+        broker.setPersistent(true);
+        broker.setDeleteAllMessagesOnStartup(true);
+        JDBCPersistenceAdapter persistenceAdapter = new JDBCPersistenceAdapter();
+        persistenceAdapter.setDataSource(sharedDs);
+        persistenceAdapter.setUseDatabaseLock(false);
+        persistenceAdapter.setLockKeepAlivePeriod(500);
+        persistenceAdapter.setLockAcquireSleepInterval(500);
+        broker.setPersistenceAdapter(persistenceAdapter);
+        broker.start();
+        super.setUp();
+    }
+
+    protected void tearDown() throws  Exception {
+       super.tearDown();
+        broker.stop();
+    }
+
+
+    protected Session createSendSession(Connection sendConnection) throws Exception {
+        if (transactedSends) {
+            return sendConnection.createSession(true, Session.SESSION_TRANSACTED);
+        } else {
+            return sendConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        }
+    }
+
+    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
+        ActiveMQConnectionFactory f =
+                new ActiveMQConnectionFactory("failover://" + broker.getTransportConnectors().get(0).getPublishableConnectString());
+        f.setExceptionListener(this);
+        return f;
+    }
+
+    @Override
+    protected void messageSent() throws Exception {    
+        if (++inflightMessageCount == failureCount) {
+            LOG.info("STOPPING DB!@!!!!");
+            final EmbeddedDataSource ds = sharedDs;
+            ds.setShutdownDatabase("shutdown");
+            try {
+                ds.getConnection();
+            } catch (Exception ignored) {
+            }
+            LOG.info("DB STOPPED!@!!!!");
+            
+            Thread dbRestartThread = new Thread("db-re-start-thread") {
+                public void run() {
+                    LOG.info("Sleeping for 10 seconds before allowing db restart");
+                    try {
+                        restartDBLatch.await(10, TimeUnit.SECONDS);
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+                    ds.setShutdownDatabase("false");
+                    LOG.info("DB RESTARTED!@!!!!");
+                }
+            };
+            dbRestartThread.start();
+        }
+    }
+     
+    protected void sendToProducer(MessageProducer producer,
+            Destination producerDestination, Message message) throws JMSException {
+        {   
+            // do some retries as db failures filter back to the client until broker sees
+            // db lock failure and shuts down
+            boolean sent = false;
+            do {
+                try { 
+                    producer.send(producerDestination, message);
+
+                    if (transactedSends && ((inflightMessageCount+1) %10 == 0 ||
(inflightMessageCount+1) >= messageCount)) {
+                        LOG.info("committing on send: " + inflightMessageCount + " message:
" + message);
+                        session.commit();
+                    }
+
+                    sent = true;
+                } catch (JMSException e) {
+                    LOG.info("Exception on producer send:", e);
+                    try { 
+                        Thread.sleep(2000);
+                    } catch (InterruptedException ignored) {
+                    }
+                }
+            } while(!sent);
+
+        }
+    }
+
+    @Override
+    public void onException(JMSException exception) {
+        LOG.error("exception on connection: ", exception);
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date



Mime
View raw message