activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r664082 - in /activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra: ActiveMQEndpointWorker.java ActiveMQResourceAdapter.java MessageEndpointProxy.java MessageResourceAdapter.java ServerSessionImpl.java ServerSessionPoolImpl.java
Date Fri, 06 Jun 2008 18:59:11 GMT
Author: rajdavies
Date: Fri Jun  6 11:59:11 2008
New Revision: 664082

URL: http://svn.apache.org/viewvc?rev=664082&view=rev
Log:
patch for https://issues.apache.org/activemq/browse/AMQ-1779

Modified:
    activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java
    activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java
    activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/MessageEndpointProxy.java
    activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/MessageResourceAdapter.java
    activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java
    activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java

Modified: activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java?rev=664082&r1=664081&r2=664082&view=diff
==============================================================================
--- activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java
(original)
+++ activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java
Fri Jun  6 11:59:11 2008
@@ -18,6 +18,7 @@
 
 import java.lang.reflect.Method;
 
+import java.util.concurrent.atomic.AtomicBoolean;
 import javax.jms.Connection;
 import javax.jms.ConnectionConsumer;
 import javax.jms.ExceptionListener;
@@ -61,24 +62,23 @@
         }
     }
 
-    protected MessageResourceAdapter adapter;
-    protected ActiveMQEndpointActivationKey endpointActivationKey;
-    protected MessageEndpointFactory endpointFactory;
-    protected WorkManager workManager;
-    protected boolean transacted;
-    protected ActiveMQConnection connection;
-
+    protected final ActiveMQEndpointActivationKey endpointActivationKey;
+    protected final MessageEndpointFactory endpointFactory;
+    protected final WorkManager workManager;
+    protected final boolean transacted;
+
+    private final ActiveMQDestination dest;
+    private final Work connectWork;
+    private final AtomicBoolean connecting = new AtomicBoolean(false);    
+    private final String shutdownMutex = "shutdownMutex";
+    
+    private ActiveMQConnection connection;
     private ConnectionConsumer consumer;
     private ServerSessionPoolImpl serverSessionPool;
-    private ActiveMQDestination dest;
     private boolean running;
-    private Work connectWork;
-
-    private long reconnectDelay = INITIAL_RECONNECT_DELAY;
 
-    public ActiveMQEndpointWorker(final MessageResourceAdapter adapter, ActiveMQEndpointActivationKey
key) throws ResourceException {
+    protected ActiveMQEndpointWorker(final MessageResourceAdapter adapter, ActiveMQEndpointActivationKey
key) throws ResourceException {
         this.endpointActivationKey = key;
-        this.adapter = adapter;
         this.endpointFactory = endpointActivationKey.getMessageEndpointFactory();
         this.workManager = adapter.getBootstrapContext().getWorkManager();
         try {
@@ -88,44 +88,99 @@
         }
 
         connectWork = new Work() {
+            long currentReconnectDelay = INITIAL_RECONNECT_DELAY;
 
             public void release() {
                 //
             }
 
             public synchronized void run() {
-                if (!isRunning()) {
-                    return;
-                }
-                if (connection != null) {
-                    return;
+                currentReconnectDelay = INITIAL_RECONNECT_DELAY;
+                MessageActivationSpec activationSpec = endpointActivationKey.getActivationSpec();
+                if ( LOG.isInfoEnabled() ) {
+                    LOG.info("Establishing connection to broker [" + adapter.getInfo().getServerUrl()
+ "]");
                 }
 
-                MessageActivationSpec activationSpec = endpointActivationKey.getActivationSpec();
+                while ( connecting.get() && running ) {
                 try {
                     connection = adapter.makeConnection(activationSpec);
-                    connection.start();
                     connection.setExceptionListener(new ExceptionListener() {
                         public void onException(JMSException error) {
                             if (!serverSessionPool.isClosing()) {
-                                reconnect(error);
+                                    // initiate reconnection only once, i.e. on initial exception
+                                    // and only if not already trying to connect
+                                    LOG.error("Connection to broker failed: " + error.getMessage(),
error);
+                                    if ( connecting.compareAndSet(false, true) ) {
+                                        synchronized ( connectWork ) {
+                                            disconnect();
+                                            serverSessionPool.closeIdleSessions();
+                                            connect();
                             }
+                                    } else {
+                                        // connection attempt has already been initiated
+                                        LOG.info("Connection attempt already in progress,
ignoring connection exception");
                         }
+                                }
+                            }
                     });
+                        connection.start();
 
+                        int prefetchSize = activationSpec.getMaxMessagesPerSessionsIntValue()
* activationSpec.getMaxSessionsIntValue();
                     if (activationSpec.isDurableSubscription()) {
-                        consumer = connection.createDurableConnectionConsumer((Topic)dest,
activationSpec.getSubscriptionName(), emptyToNull(activationSpec.getMessageSelector()), serverSessionPool,
-                                                                              activationSpec.getMaxMessagesPerSessionsIntValue(),
activationSpec.getNoLocalBooleanValue());
+                            consumer = connection.createDurableConnectionConsumer(
+                                    (Topic) dest,
+                                    activationSpec.getSubscriptionName(), 
+                                    emptyToNull(activationSpec.getMessageSelector()),
+                                    serverSessionPool, 
+                                    prefetchSize,
+                                    activationSpec.getNoLocalBooleanValue());
                     } else {
-                        consumer = connection.createConnectionConsumer(dest, emptyToNull(activationSpec.getMessageSelector()),
serverSessionPool, activationSpec.getMaxMessagesPerSessionsIntValue(),
+                            consumer = connection.createConnectionConsumer(
+                                    dest, 
+                                    emptyToNull(activationSpec.getMessageSelector()), 
+                                    serverSessionPool, 
+                                    prefetchSize,
                                                                        activationSpec.getNoLocalBooleanValue());
                     }
 
+
+                        if ( connecting.compareAndSet(true, false) ) {
+                            if ( LOG.isInfoEnabled() ) {
+                                LOG.info("Successfully established connection to broker ["
+ adapter.getInfo().getServerUrl() + "]");
+                            }
+                        } else {
+                            LOG.error("Could not release connection lock");
+                        }
                 } catch (JMSException error) {
-                    LOG.debug("Fail to to connect: " + error, error);
-                    reconnect(error);
+                        if ( LOG.isDebugEnabled() ) {
+                            LOG.debug("Failed to connect: " + error.getMessage(), error);
+                }
+                        disconnect();
+                        pause(error);
+            }
                 }
             }
+            
+            private void pause(JMSException error) {
+                if (currentReconnectDelay == MAX_RECONNECT_DELAY) {
+                    LOG.error("Failed to connect to broker [" + adapter.getInfo().getServerUrl()
+ "]: " 
+                            + error.getMessage(), error);
+                    LOG.error("Endpoint will try to reconnect to the JMS broker in " + (MAX_RECONNECT_DELAY
/ 1000) + " seconds");
+                }
+                try {
+                    synchronized ( shutdownMutex ) {
+                        // shutdownMutex will be notified by stop() method in
+                        // order to accelerate shutdown of endpoint
+                        shutdownMutex.wait(currentReconnectDelay);
+                    }
+                } catch ( InterruptedException e ) {
+                    Thread.interrupted();
+                }
+                currentReconnectDelay *= 2;
+                if (currentReconnectDelay > MAX_RECONNECT_DELAY) {
+                    currentReconnectDelay = MAX_RECONNECT_DELAY;
+                }                
+            }
         };
 
         MessageActivationSpec activationSpec = endpointActivationKey.getActivationSpec();
@@ -140,24 +195,12 @@
     }
 
     /**
-     * @param s
-     */
-    public static void safeClose(Session s) {
-        try {
-            if (s != null) {
-                s.close();
-            }
-        } catch (JMSException e) {
-            //
-        }
-    }
-
-    /**
      * @param c
      */
     public static void safeClose(Connection c) {
         try {
             if (c != null) {
+                LOG.debug("Closing connection to broker");
                 c.close();
             }
         } catch (JMSException e) {
@@ -171,6 +214,7 @@
     public static void safeClose(ConnectionConsumer cc) {
         try {
             if (cc != null) {
+                LOG.debug("Closing ConnectionConsumer");
                 cc.close();
             }
         } catch (JMSException e) {
@@ -181,35 +225,44 @@
     /**
      * 
      */
-    public synchronized void start() throws WorkException, ResourceException {
-        if (running) {
+    public void start() throws ResourceException {
+        synchronized (connectWork) {
+            if (running)
             return;
-        }
         running = true;
 
-        LOG.debug("Starting");
+            if ( connecting.compareAndSet(false, true) ) {
+                LOG.info("Starting");
         serverSessionPool = new ServerSessionPoolImpl(this, endpointActivationKey.getActivationSpec().getMaxSessionsIntValue());
         connect();
-        LOG.debug("Started");
+            } else {
+                LOG.warn("Ignoring start command, EndpointWorker is already trying to connect");
+    }
+        }
     }
 
     /**
      * 
      */
-    public synchronized void stop() throws InterruptedException {
-        if (!running) {
+    public void stop() throws InterruptedException {
+        synchronized (shutdownMutex) {
+            if (!running)
             return;
-        }
         running = false;
+            LOG.info("Stopping");
+            // wake up pausing reconnect attempt
+            shutdownMutex.notifyAll();
         serverSessionPool.close();
         disconnect();
     }
+    }
 
     private boolean isRunning() {
         return running;
     }
 
-    private synchronized void connect() {
+    private void connect() {
+        synchronized ( connectWork ) {
         if (!running) {
             return;
         }
@@ -221,45 +274,19 @@
             LOG.error("Work Manager did not accept work: ", e);
         }
     }
+    }
 
     /**
      * 
      */
-    private synchronized void disconnect() {
+    private void disconnect() {
+        synchronized ( connectWork ) {
         safeClose(consumer);
         consumer = null;
         safeClose(connection);
         connection = null;
     }
-
-    private void reconnect(JMSException error) {
-        LOG.debug("Reconnect cause: ", error);
-        long reconnectDelay;
-        synchronized (this) {
-            reconnectDelay = this.reconnectDelay;
-            // Only log errors if the server is really down.. And not a temp
-            // failure.
-            if (reconnectDelay == MAX_RECONNECT_DELAY) {
-                LOG.error("Endpoint connection to JMS broker failed: " + error.getMessage());
-                LOG.error("Endpoint will try to reconnect to the JMS broker in " + (MAX_RECONNECT_DELAY
/ 1000) + " seconds");
             }
-        }
-        try {
-            disconnect();
-            Thread.sleep(reconnectDelay);
-
-            synchronized (this) {
-                // Use exponential rollback.
-                this.reconnectDelay *= 2;
-                if (this.reconnectDelay > MAX_RECONNECT_DELAY) {
-                    this.reconnectDelay = MAX_RECONNECT_DELAY;
-                }
-            }
-            connect();
-        } catch (InterruptedException e) {
-            //
-        }
-    }
 
     protected void registerThreadSession(Session session) {
         THREAD_LOCAL.set(session);
@@ -269,6 +296,16 @@
         THREAD_LOCAL.set(null);
     }
 
+    protected ActiveMQConnection getConnection() {
+        // make sure we only return a working connection
+        // in particular make sure that we do not return null
+        // after the resource adapter got disconnected from
+        // the broker via the disconnect() method
+        synchronized ( connectWork ) {
+            return connection;
+        }
+    }
+
     private String emptyToNull(String value) {
         if (value == null || value.length() == 0) {
             return null;

Modified: activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java?rev=664082&r1=664081&r2=664082&view=diff
==============================================================================
--- activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java
(original)
+++ activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java
Fri Jun  6 11:59:11 2008
@@ -71,6 +71,7 @@
         this.bootstrapContext = bootstrapContext;
         if (brokerXmlConfig != null && brokerXmlConfig.trim().length() > 0) {
             brokerStartThread = new Thread("Starting ActiveMQ Broker") {
+                @Override
                 public void run () {
                     try {
                         synchronized( ActiveMQResourceAdapter.this ) {
@@ -110,21 +111,21 @@
      * @param activationSpec
      */
     public ActiveMQConnection makeConnection(MessageActivationSpec activationSpec) throws
JMSException {
-        ActiveMQConnectionFactory connectionFactory = this.connectionFactory;
-        if (connectionFactory == null) {
-            connectionFactory = createConnectionFactory(getInfo());
+        ActiveMQConnectionFactory cf = getConnectionFactory();
+        if (cf == null) {
+            cf = createConnectionFactory(getInfo());
         }
         String userName = defaultValue(activationSpec.getUserName(), getInfo().getUserName());
         String password = defaultValue(activationSpec.getPassword(), getInfo().getPassword());
         String clientId = activationSpec.getClientId();
         if (clientId != null) {
-            connectionFactory.setClientID(clientId);
+            cf.setClientID(clientId);
         } else {
             if (activationSpec.isDurableSubscription()) {
                 log.warn("No clientID specified for durable subscription: " + activationSpec);
             }
         }
-        ActiveMQConnection physicalConnection = (ActiveMQConnection)connectionFactory.createConnection(userName,
password);
+        ActiveMQConnection physicalConnection = (ActiveMQConnection) cf.createConnection(userName,
password);
 
         // have we configured a redelivery policy
         RedeliveryPolicy redeliveryPolicy = activationSpec.redeliveryPolicy();
@@ -318,8 +319,8 @@
         return connectionFactory;
     }
 
-    public void setConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
-        this.connectionFactory = connectionFactory;
+    public void setConnectionFactory(ActiveMQConnectionFactory aConnectionFactory) {
+        this.connectionFactory = aConnectionFactory;
     }
 
 

Modified: activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/MessageEndpointProxy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/MessageEndpointProxy.java?rev=664082&r1=664081&r2=664082&view=diff
==============================================================================
--- activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/MessageEndpointProxy.java
(original)
+++ activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/MessageEndpointProxy.java
Fri Jun  6 11:59:11 2008
@@ -22,6 +22,8 @@
 import javax.jms.MessageListener;
 import javax.resource.ResourceException;
 import javax.resource.spi.endpoint.MessageEndpoint;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * @author <a href="mailto:michael.gaffney@panacya.com">Michael Gaffney </a>
@@ -30,6 +32,7 @@
 
     private static final MessageEndpointState ALIVE = new MessageEndpointAlive();
     private static final MessageEndpointState DEAD = new MessageEndpointDead();
+    private static final Log LOG = LogFactory.getLog(MessageEndpointProxy.class);
 
     private static int proxyCount;
     private final int proxyID;
@@ -52,18 +55,22 @@
     }
 
     public void beforeDelivery(Method method) throws NoSuchMethodException, ResourceException
{
+        LOG.trace("Invoking MessageEndpoint.beforeDelivery()");
         state.beforeDelivery(this, method);
     }
 
     public void onMessage(Message message) {
+        LOG.trace("Invoking MessageEndpoint.onMethod()");
         state.onMessage(this, message);
     }
 
     public void afterDelivery() throws ResourceException {
+        LOG.trace("Invoking MessageEndpoint.afterDelivery()");
         state.afterDelivery(this);
     }
 
     public void release() {
+        LOG.trace("Invoking MessageEndpoint.release()");
         state.release(this);
     }
 

Modified: activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/MessageResourceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/MessageResourceAdapter.java?rev=664082&r1=664081&r2=664082&view=diff
==============================================================================
--- activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/MessageResourceAdapter.java
(original)
+++ activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/MessageResourceAdapter.java
Fri Jun  6 11:59:11 2008
@@ -30,7 +30,7 @@
  * 
  * @version $Revision$
  */
-interface MessageResourceAdapter extends ResourceAdapter {
+public interface MessageResourceAdapter extends ResourceAdapter {
 
     /**
      */

Modified: activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java?rev=664082&r1=664081&r2=664082&view=diff
==============================================================================
--- activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java
(original)
+++ activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java
Fri Jun  6 11:59:11 2008
@@ -104,6 +104,10 @@
         return session;
     }
 
+    protected boolean isStale() {
+        return stale || !session.isRunning();
+    }
+
     public MessageProducer getMessageProducer() throws JMSException {
         if (messageProducer == null) {
             messageProducer = getSession().createProducer(null);
@@ -156,12 +160,12 @@
      */
     public void run() {
         log.debug("Running");
+        currentBatchSize = 0;
         while (true) {
             log.debug("run loop start");
             try {
-                if ( session.isRunning() ) {
                 InboundContextSupport.register(this);
-                currentBatchSize = 0;
+                if ( session.isRunning() ) {
                 session.run();
                 } else {
                     log.debug("JMS Session is no longer running (maybe due to loss of connection?),
marking ServerSesison as stale");
@@ -169,8 +173,11 @@
                 }
             } catch (Throwable e) {
                 stale = true;
+                if ( log.isInfoEnabled() ) {
+                    log.info("Endpoint failed to process message. Reason: " + e.getMessage());
                   
+                } else if ( log.isDebugEnabled() ) {
                 log.debug("Endpoint failed to process message.", e);
-                log.info("Endpoint failed to process message. Reason: " + e);
+                }
             } finally {
                 InboundContextSupport.unregister(this);
                 log.debug("run loop end");
@@ -224,7 +231,7 @@
                         // Sanitiy Check: If the local transaction has not been
                         // commited..
                         // Commit it now.
-                        log.warn("Local transaction had not been commited.  Commiting now.");
+                        log.warn("Local transaction had not been commited. Commiting now.");
                     }
                     try {
                         session.commit();
@@ -246,6 +253,7 @@
     /**
      * @see java.lang.Object#toString()
      */
+    @Override
     public String toString() {
         return "ServerSessionImpl:" + serverSessionId;
     }
@@ -254,12 +262,12 @@
         try {
             endpoint.release();
         } catch (Throwable e) {
-            log.debug("Endpoint did not release properly: " + e, e);
+            log.debug("Endpoint did not release properly: " + e.getMessage(), e);
         }
         try {
             session.close();
         } catch (Throwable e) {
-            log.debug("Session did not close properly: " + e, e);
+            log.debug("Session did not close properly: " + e.getMessage(), e);
         }
     }
 

Modified: activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java?rev=664082&r1=664081&r2=664082&view=diff
==============================================================================
--- activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java
(original)
+++ activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java
Fri Jun  6 11:59:11 2008
@@ -60,7 +60,7 @@
     private ServerSessionImpl createServerSessionImpl() throws JMSException {
         MessageActivationSpec activationSpec = activeMQAsfEndpointWorker.endpointActivationKey.getActivationSpec();
         int acknowledge = (activeMQAsfEndpointWorker.transacted) ? Session.SESSION_TRANSACTED
: activationSpec.getAcknowledgeModeForSession();
-        final ActiveMQSession session = (ActiveMQSession)activeMQAsfEndpointWorker.connection.createSession(activeMQAsfEndpointWorker.transacted,
acknowledge);
+        final ActiveMQSession session = (ActiveMQSession)activeMQAsfEndpointWorker.getConnection().createSession(activeMQAsfEndpointWorker.transacted,
acknowledge);
         MessageEndpoint endpoint;
         try {
             int batchSize = 0;
@@ -188,13 +188,21 @@
     }
 
     public void returnToPool(ServerSessionImpl ss) {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Session returned to pool: " + ss);
-        }
         sessionLock.lock();
-        try {
             activeSessions.remove(ss);
+        try {
+            // make sure we only return non-stale sessions to the pool
+            if ( ss.isStale() ) {
+                if ( LOG.isDebugEnabled() ) {
+                    LOG.debug("Discarding stale ServerSession to be returned to pool: " +
ss);
+                }
+                ss.close();
+            } else {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("ServerSession returned to pool: " + ss);
+                }
             idleSessions.add(ss);
+            }
         } finally {
             sessionLock.unlock();
         }
@@ -243,7 +251,7 @@
         } else if (s instanceof ActiveMQTopicSession) {
             session = (ActiveMQSession) s;
         } else {
-            activeMQAsfEndpointWorker.connection
+            activeMQAsfEndpointWorker.getConnection()
                     .onAsyncException(new JMSException(
                             "Session pool provided an invalid session type: "
                                     + s.getClass()));
@@ -275,7 +283,7 @@
     }
 
 
-    private int closeIdleSessions() {
+    protected int closeIdleSessions() {
         sessionLock.lock();
         try {
             for (ServerSessionImpl ss : idleSessions) {



Mime
View raw message