qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wpr...@apache.org
Subject svn commit: r1333473 - in /qpid/trunk/qpid/java/jca: ./ example/src/main/java/org/apache/qpid/jca/example/ejb/ src/main/java/org/apache/qpid/ra/ src/main/java/org/apache/qpid/ra/admin/ src/main/java/org/apache/qpid/ra/inflow/ src/main/resources/META-IN...
Date Thu, 03 May 2012 14:23:04 GMT
Author: wprice
Date: Thu May  3 14:23:03 2012
New Revision: 1333473

URL: http://svn.apache.org/viewvc?rev=1333473&view=rev
Log:
QPID-3878:
    QpidActivation should use connection per inbound listener
    Added ability to configure inbound listener multiplex behavior
    Added new property to ra.xml
    Updated README.txt to reflect changes
    Added Serialization unit tests for ActivationSpec and
    ResourceAdapter

    Contributions from Kevin Conner

Added:
    qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidExceptionHandler.java
    qpid/trunk/qpid/java/jca/src/test/java/org/apache/qpid/ra/QpidActivationSpecTest.java
      - copied, changed from r1333150, qpid/trunk/qpid/java/jca/src/test/java/org/apache/qpid/ra/QpidResourceAdapterTest.java
Modified:
    qpid/trunk/qpid/java/jca/README.txt
    qpid/trunk/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidHelloListenerBean.java
    qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAProperties.java
    qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java
    qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidQueueImpl.java
    qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivation.java
    qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivationSpec.java
    qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidMessageHandler.java
    qpid/trunk/qpid/java/jca/src/main/resources/META-INF/ra.xml
    qpid/trunk/qpid/java/jca/src/test/java/org/apache/qpid/ra/QpidResourceAdapterTest.java

Modified: qpid/trunk/qpid/java/jca/README.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/README.txt?rev=1333473&r1=1333472&r2=1333473&view=diff
==============================================================================
--- qpid/trunk/qpid/java/jca/README.txt (original)
+++ qpid/trunk/qpid/java/jca/README.txt Thu May  3 14:23:03 2012
@@ -7,7 +7,8 @@ for JEE integration between EE applicati
 
 The Qpid JCA adapter provides both outbound and inbound connectivity and
 exposes a variety of options to fine tune your messaging applications. Currently
-the adapter only supports C++ based brokers and has only been tested with Apache Qpid C++ broker.
+the adapter only supports C++ based brokers and has only been tested with Apache
+Qpid C++ broker.
 
 The following document explains general configuration information for the Qpid JCA RA. Details for
 specific application server platforms are provided in separate README files typically designated as
@@ -38,22 +39,22 @@ encouraged. Similarly, familiarity with 
 The ResourceAdapter JavaBean
 ============================
 
-The ResourceAdapter JavaBean provides global configuration options for both inbound and outbound connectivity.
-The set of ResourceAdapter properties are described below. The ResourceAdapter properties can be found in the META-INF/ra.xml
-deployment descriptor which is provided with the adapter. Note, deploying a ResourceAdapter, ManagedConnectionFactory
-or ActivationSpec is application server specific. As such, this document provides an explanation of these properties
-but not how they are configured as this is environment specific.
+The ResourceAdapter JavaBean provides global configuration options for both inbound and outbound
+connectivity. The set of ResourceAdapter properties are described below. The ResourceAdapter properties
+can be found in the META-INF/ra.xml deployment descriptor which is provided with the adapter. Note,
+deploying a ResourceAdapter, ManagedConnectionFactory or ActivationSpec is application server specific.
+As such, this document provides an explanation of these properties but not how they are configured.
 
 ResourceAdapter JavaBean Properties
 ===================================
 
-ClientID
+ClientId
    The unique client identifier. From the JMS API this is used only in the context of durable subscriptions.
 Default: client_id
 
 SetupAttempts
-    The number of attempts the ResourceAdapter will make to successfully setup an inbound activation on deployment, or when an exception
-    occurs at runtime.
+    The number of attempts the ResourceAdapter will make to successfully setup an inbound activation on deployment,
+    or when an exception occurs at runtime.
 Default: 5
 
 SetupInterval
@@ -92,9 +93,11 @@ TransactionManagerLocatorMethod
     server specific as such, no default is provided.
 Default:none
 
-Note, both the TransactionManagerLocatorClass and the TransactionManagerLocatorMethod
-properties must be set. While application servers typically provide a mechanism to do this in the form of
-a specific deployment descriptor, or GUI console, the ra.xml file can also be modified directly.
+UseConnectionPerHandler
+    The Apache C++ Broker multiplexes on the physical connection rather than the session. As a result, performance
+    improvements can be gained by allocating and assigning a connection per inbound listener. The alternative is
+    to share a connection across each handler for a given endpoint (MDB).
+Default:true
 
 The ManagedConnectionFactory JavaBean
 =====================================
@@ -176,11 +179,11 @@ Both these administered objects have pro
 
 QpidQueue/QpidTopic
 ====================
-    The QpidQueue/QpidTopic AdminObjects allow a developer, deployer or adminstrator to create destinations
-    (queues or topic) and bind these destinations into JNDI. Only one property is required:
+The QpidQueue and QpidTopic AdminObjects allow binding JMS destintations into the JEE JNDI namespace. Both
+objects support one property:
 
 DestinationAddress
-    The address string of the destination. Please see the Qpid Java JMS client documentation for valid values.
+    The address string of the destination. Please see the Qpid JMS client documentation for valid values.
 
 Example:
    DestinationAddress=hello.Queue;{create:always, node:{type:queue, x-declare:{auto-delete:true}}}
@@ -188,13 +191,13 @@ Example:
 
 QpidConnectionFactoryProxy
 ==========================
-    The QpidConnectionFactoryProxy allows for a non-JCA ConnectionFactory to be bound into the JNDI tree. This
-    ConnectionFactory can in turn be used outside of the application server. Typically a ConnectionFactory of
-    this sort is used by Java Swing or other non-managed clients not requiring JCA. One one property is
-    required:
+The QpidConnectionFactoryProxy allows for a non-JCA ConnectionFactory to be bound into the JNDI tree. This
+ConnectionFactory can in turn be used outside of the application server. Typically a ConnectionFactory of
+this sort is used by Swing or other non-managed clients not requiring JCA. The QpidConnectionFactoryProxy
+provides one property
 
 ConnectionURL
-    This is the url used to configure the connection factory. Please see the Qpid Java Client documentation for
+    This is the url used to configure the connection factory. Please see the Qpid JMS client documentation for
     further details.
 
 Example:
@@ -205,10 +208,11 @@ Transaction Support
 The Qpid JCA Resource Adapter provides three levels of transaction support: XA, LocalTransactions and NoTransaction.
 Typical usage of the Qpid JCA Resource adapter implies the use of XA transactions, though there are certain scenarios
 where this is not preferred. Transaction support configuration is application server specific and as such, is explained
-in the corresponding documentation for each supported application server. Current limitations with XA are listed
-below:
+in the corresponding documentation for each supported application server.
 
-1)XARecovery is currently only supported for JBoss EAP 5.x and is not supported for clustered broker configurations.
+XA recovery, that is being able to recover 'in-doubt' transactions for a given resource manager is non-standardized,
+and as such is application server specific. Currently, the Qpid JCA adapter only supports recovery for JBoss EAP 5.x
+as a separate module.
 
 Conclusion
 ==========

Modified: qpid/trunk/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidHelloListenerBean.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidHelloListenerBean.java?rev=1333473&r1=1333472&r2=1333473&view=diff
==============================================================================
--- qpid/trunk/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidHelloListenerBean.java (original)
+++ qpid/trunk/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidHelloListenerBean.java Thu May  3 14:23:03 2012
@@ -66,10 +66,6 @@ public class QpidHelloListenerBean imple
 
         try
         {
-            _log.info(message.getJMSDestination().getClass().getName());
-
-            javax.jms.Queue queue = (javax.jms.Queue)message.getJMSDestination();
-            _log.info("QueueName is: " + queue.getQueueName());
             if(message instanceof TextMessage)
             {
                 String content = ((TextMessage)message).getText();

Modified: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAProperties.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAProperties.java?rev=1333473&r1=1333472&r2=1333473&view=diff
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAProperties.java (original)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAProperties.java Thu May  3 14:23:03 2012
@@ -31,10 +31,8 @@ import org.slf4j.LoggerFactory;
  */
 public class QpidRAProperties extends ConnectionFactoryProperties implements Serializable
 {
-   /** Serial version UID */
    private static final long serialVersionUID = -4823893873707374791L;
 
-   /** The logger */
    private static final Logger _log = LoggerFactory.getLogger(QpidRAProperties.class);
 
    private static final int DEFAULT_SETUP_ATTEMPTS = 10;
@@ -45,16 +43,14 @@ public class QpidRAProperties extends Co
 
    private long _setupInterval = DEFAULT_SETUP_INTERVAL;
 
-   /** Use Local TX instead of XA */
-   private Boolean _localTx = false;
-
    /** Class used to locate the Transaction Manager. */
    private String _transactionManagerLocatorClass ;
 
    /** Method used to locate the TM */
    private String _transactionManagerLocatorMethod ;
 
-
+   private boolean _useConnectionPerHandler = true;
+   
    /**
     * Constructor
     */
@@ -146,10 +142,20 @@ public class QpidRAProperties extends Co
       this._setupInterval = setupInterval;
    }
 
+   public boolean isUseConnectionPerHandler()
+   {
+       return _useConnectionPerHandler;
+   }
+   
+   public void setUseConnectionPerHandler(boolean connectionPerHandler)
+   {
+       this._useConnectionPerHandler = connectionPerHandler;                       
+   }
+   
    @Override
    public String toString()
    {
-      return "QpidRAProperties[localTx=" + _localTx +
+      return "QpidRAProperties[" +
             ", transactionManagerLocatorClass=" + _transactionManagerLocatorClass +
             ", transactionManagerLocatorMethod=" + _transactionManagerLocatorMethod +
             ", setupAttempts=" + _setupAttempts +

Modified: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java?rev=1333473&r1=1333472&r2=1333473&view=diff
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java (original)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java Thu May  3 14:23:03 2012
@@ -38,8 +38,6 @@ import javax.resource.spi.work.WorkManag
 import javax.transaction.TransactionManager;
 import javax.transaction.xa.XAResource;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQConnectionFactory;
 import org.apache.qpid.client.AMQConnectionURL;
@@ -47,6 +45,8 @@ import org.apache.qpid.client.XAConnecti
 import org.apache.qpid.ra.inflow.QpidActivation;
 import org.apache.qpid.ra.inflow.QpidActivationSpec;
 import org.apache.qpid.url.URLSyntaxException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * The resource adapter for Qpid
@@ -54,43 +54,22 @@ import org.apache.qpid.url.URLSyntaxExce
  */
 public class QpidResourceAdapter implements ResourceAdapter, Serializable
 {
-   /**
-    *
-    */
    private static final long serialVersionUID = -2446231446818098726L;
 
-   /**
-    * The logger
-    */
-   private static final Logger _log = LoggerFactory.getLogger(QpidResourceAdapter.class);
+   private static final transient Logger _log = LoggerFactory.getLogger(QpidResourceAdapter.class);
 
-   /**
-    * The bootstrap context
-    */
    private BootstrapContext _ctx;
 
-   /**
-    * The resource adapter properties
-    */
    private final QpidRAProperties _raProperties;
 
-   /**
-    * Have the factory been configured
-    */
    private final AtomicBoolean _configured;
 
-   /**
-    * The activations by activation spec
-    */
    private final Map<ActivationSpec, QpidActivation> _activations;
 
    private AMQConnectionFactory _defaultAMQConnectionFactory;
 
    private TransactionManager _tm;
 
-   /**
-    * Constructor
-    */
    public QpidResourceAdapter()
    {
       if (_log.isTraceEnabled())
@@ -514,7 +493,27 @@ public class QpidResourceAdapter impleme
       }
       _raProperties.setSetupInterval(interval);
    }
-
+   
+   public Boolean isUseConnectionPerHandler()
+   {
+       if (_log.isTraceEnabled())
+       {
+          _log.trace("isConnectionPerHandler()");
+       }
+       
+       return _raProperties.isUseConnectionPerHandler();
+   }
+
+   public void setUseConnectionPerHandler(Boolean connectionPerHandler)
+   {
+       if (_log.isTraceEnabled())
+       {
+          _log.trace("setConnectionPerHandler(" + connectionPerHandler + ")");
+       }  
+       
+       _raProperties.setUseConnectionPerHandler(connectionPerHandler);
+   }
+   
    /**
     * Indicates whether some other object is "equal to" this one.
     *
@@ -683,7 +682,8 @@ public class QpidResourceAdapter impleme
 
    private void locateTM() throws ResourceAdapterInternalException
    {
-      if(_raProperties.getTransactionManagerLocatorClass() != null && _raProperties.getTransactionManagerLocatorMethod() != null)
+      if(_raProperties.getTransactionManagerLocatorClass() != null 
+                      && _raProperties.getTransactionManagerLocatorMethod() != null)
       {
 
           String locatorClasses[] = _raProperties.getTransactionManagerLocatorClass().split(";");
@@ -703,7 +703,7 @@ public class QpidResourceAdapter impleme
 
       if (_tm == null)
       {
-         _log.error("It wasn't possible to locate javax.transaction.TransactionManager via the RA properties TransactionManagerLocatorClass and TransactionManagerLocatorMethod");
+         _log.error("It was not possible to locate javax.transaction.TransactionManager via the RA properties TransactionManagerLocatorClass and TransactionManagerLocatorMethod");
          throw new ResourceAdapterInternalException("Could not locate javax.transaction.TransactionManager");
       }
       else
@@ -763,6 +763,7 @@ public class QpidResourceAdapter impleme
          final String client = (clientID != null ? clientID : "") ;
 
          final String newurl = AMQConnectionURL.AMQ_PROTOCOL + "://" + username +":" + password + "@" + client + "/" + path + '?' + AMQConnectionURL.OPTIONS_BROKERLIST + "='tcp://" + host + ':' + port + '\'' ;
+         
          if (_log.isDebugEnabled())
          {
             _log.debug("Initialising connectionURL to " + newurl) ;

Modified: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidQueueImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidQueueImpl.java?rev=1333473&r1=1333472&r2=1333473&view=diff
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidQueueImpl.java (original)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidQueueImpl.java Thu May  3 14:23:03 2012
@@ -21,25 +21,15 @@
 package org.apache.qpid.ra.admin;
 
 import java.io.Externalizable;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.ObjectInput;
-import java.io.ObjectInputStream;
 import java.io.ObjectOutput;
-import java.io.ObjectOutputStream;
-import java.util.Hashtable;
 
-import javax.naming.Context;
-import javax.naming.Name;
 import javax.naming.NamingException;
-import javax.naming.RefAddr;
 import javax.naming.Reference;
 import javax.naming.StringRefAddr;
-import javax.naming.spi.ObjectFactory;
 
 import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.ra.admin.AdminObjectFactory;
 
 public class QpidQueueImpl extends AMQQueue implements QpidQueue, Externalizable
 {
@@ -101,19 +91,4 @@ public class QpidQueueImpl extends AMQQu
         out.writeObject(this._url);
     }
 
-    //TODO move to tests
-    public static void main(String[] args) throws Exception
-    {
-        QpidQueueImpl q = new QpidQueueImpl();
-        q.setDestinationAddress("hello.Queue;{create:always, node:{type:queue, x-declare:{auto-delete:true}}}");
-        ObjectOutputStream os = new ObjectOutputStream(new FileOutputStream("queue.out"));
-        os.writeObject(q);
-        os.close();
-
-
-        ObjectInputStream is = new ObjectInputStream(new FileInputStream("queue.out"));
-        q = (QpidQueueImpl)is.readObject();
-        System.out.println(q);
-
-    }
 }

Modified: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivation.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivation.java?rev=1333473&r1=1333472&r2=1333473&view=diff
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivation.java (original)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivation.java Thu May  3 14:23:03 2012
@@ -20,114 +20,28 @@
  */
 package org.apache.qpid.ra.inflow;
 
-import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
 
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.Topic;
-import javax.naming.Context;
-import javax.naming.InitialContext;
 import javax.resource.ResourceException;
 import javax.resource.spi.endpoint.MessageEndpointFactory;
 import javax.resource.spi.work.Work;
-import javax.resource.spi.work.WorkManager;
 
+import org.apache.qpid.ra.QpidResourceAdapter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQConnectionFactory;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.XAConnectionImpl;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.ra.QpidResourceAdapter;
-import org.apache.qpid.ra.Util;
-
 /**
  * The activation.
  *
  */
-public class QpidActivation implements ExceptionListener
+public class QpidActivation extends QpidExceptionHandler
 {
-   /**
-    * The logger
-    */
    private static final Logger _log = LoggerFactory.getLogger(QpidActivation.class);
 
-   /**
-    * The onMessage method
-    */
-   public static final Method ONMESSAGE;
-
-   /**
-    * The resource adapter
-    */
-   private final QpidResourceAdapter _ra;
-
-   /**
-    * The activation spec
-    */
-   private final QpidActivationSpec _spec;
-
-   /**
-    * The message endpoint factory
-    */
-   private final MessageEndpointFactory _endpointFactory;
-
-   /**
-    * Whether delivery is active
-    */
-   private final AtomicBoolean _deliveryActive = new AtomicBoolean(false);
-
-   /**
-    * The destination type
-    */
-   private boolean _isTopic = false;
-
-   /**
-    * Is the delivery transacted
-    */
-   private boolean _isDeliveryTransacted;
-
-   private Destination _destination;
-
-   /**
-    * The connection
-    */
-   private Connection _connection;
-
    private final List<QpidMessageHandler> _handlers = new ArrayList<QpidMessageHandler>();
 
-   private AMQConnectionFactory _factory;
-
-   // Whether we are in the failure recovery loop
-   private AtomicBoolean _inFailure = new AtomicBoolean(false);
-
-   //Whether or not we have completed activating
-   private AtomicBoolean _activated = new AtomicBoolean(false);
-
-   static
-   {
-      try
-      {
-         ONMESSAGE = MessageListener.class.getMethod("onMessage", new Class[] { Message.class });
-      }
-      catch (Exception e)
-      {
-         throw new RuntimeException(e);
-      }
-   }
-
+   
    /**
     * Constructor
     *
@@ -140,97 +54,8 @@ public class QpidActivation implements E
                             final MessageEndpointFactory endpointFactory,
                             final QpidActivationSpec spec) throws ResourceException
    {
-      if (_log.isTraceEnabled())
-      {
-         _log.trace("constructor(" + ra + ", " + endpointFactory + ", " + spec + ")");
-      }
-
-      this._ra = ra;
-      this._endpointFactory = endpointFactory;
-      this._spec = spec;
-      try
-      {
-         _isDeliveryTransacted = endpointFactory.isDeliveryTransacted(QpidActivation.ONMESSAGE);
-      }
-      catch (Exception e)
-      {
-         throw new ResourceException(e);
-      }
-   }
-
-   /**
-    * Get the activation spec
-    *
-    * @return The value
-    */
-   public QpidActivationSpec getActivationSpec()
-   {
-      if (_log.isTraceEnabled())
-      {
-         _log.trace("getActivationSpec()");
-      }
-
-      return _spec;
-   }
-
-   /**
-    * Get the message endpoint factory
-    *
-    * @return The value
-    */
-   public MessageEndpointFactory getMessageEndpointFactory()
-   {
-      if (_log.isTraceEnabled())
-      {
-         _log.trace("getMessageEndpointFactory()");
-      }
-
-      return _endpointFactory;
-   }
-
-   /**
-    * Get whether delivery is transacted
-    *
-    * @return The value
-    */
-   public boolean isDeliveryTransacted()
-   {
-      if (_log.isTraceEnabled())
-      {
-         _log.trace("isDeliveryTransacted()");
-      }
-
-      return _isDeliveryTransacted;
-   }
-
-   /**
-    * Get the work manager
-    *
-    * @return The value
-    */
-   public WorkManager getWorkManager()
-   {
-      if (_log.isTraceEnabled())
-      {
-         _log.trace("getWorkManager()");
-      }
-
-      return _ra.getWorkManager();
-   }
-
-   /**
-    * Is the destination a topic
-    *
-    * @return The value
-    */
-   public boolean isTopic()
-   {
-      if (_log.isTraceEnabled())
-      {
-         _log.trace("isTopic()");
-      }
-
-      return _isTopic;
+      super(ra, spec, endpointFactory);
+      
    }
 
    /**
@@ -267,69 +92,61 @@ public class QpidActivation implements E
     *
     * @throws Exception Thrown if an error occurs
     */
-   protected synchronized void setup() throws Exception
+   public synchronized void setup() throws Exception
    {
       _log.debug("Setting up " + _spec);
-      setupCF();
-
+      setupCF();      
       setupDestination();
-      final AMQConnection amqConnection ;
-      final boolean useLocalTx = _spec.isUseLocalTx() ;
-      final boolean isXA = _isDeliveryTransacted && !useLocalTx ;
-
-      if (isXA)
+      
+      if(!_spec.isUseConnectionPerHandler())
       {
-         amqConnection = (XAConnectionImpl)_factory.createXAConnection() ;
+          setupConnection();
+          _connection.setExceptionListener(this);          
       }
-      else
-      {
-         amqConnection = (AMQConnection)_factory.createConnection() ;
-      }
-
-      amqConnection.setExceptionListener(this) ;
-
+      
       for (int i = 0; i < _spec.getMaxSession(); i++)
       {
-         Session session = null;
-
-         try
-         {
-            if (isXA)
-            {
-               session = _ra.createXASession((XAConnectionImpl)amqConnection) ;
-            }
-            else
-            {
-               session = _ra.createSession((AMQConnection)amqConnection,
-                     _spec.getAcknowledgeModeInt(),
-                     useLocalTx,
-                     _spec.getPrefetchLow(),
-                     _spec.getPrefetchHigh());
-            }
-
-            _log.debug("Using session " + Util.asString(session));
-            QpidMessageHandler handler = new QpidMessageHandler(this, _ra.getTM(), session);
-            handler.setup();
-            _handlers.add(handler);
-         }
-         catch (Exception e)
-         {
-            try
-            {
-               amqConnection.close() ;
-            }
-            catch (Exception e2)
-            {
-               _log.trace("Ignored error closing connection", e2);
-            }
+          try
+          {
+              QpidMessageHandler handler = null;
+              
+              if(_spec.isUseConnectionPerHandler())
+              {
+                  handler = new QpidMessageHandler(_ra, _spec, _endpointFactory, _ra.getTM());
+              }
+              else
+              {
+                  handler = new QpidMessageHandler(_ra, _spec, _endpointFactory, _ra.getTM(), _connection);
+              }
+              
+              handler.start();
+              _handlers.add(handler);
+          }
+          catch(Exception e)
+          {
+              try
+              {
+                  if(_connection != null)
+                  {
+                      this._connection.close();                      
+                  }
+              }
+              catch (Exception e2)
+              {
+                 _log.trace("Ignored error closing connection", e2);
+              }
+
+              throw e;
 
-            throw e;
-         }
+          }         
+        
       }
 
-      amqConnection.start() ;
-      this._connection = amqConnection ;
-      _activated.set(true);
+      if(!_spec.isUseConnectionPerHandler())
+      {
+          this._connection.start();
+          _activated.set(true);
+      }
 
       _log.debug("Setup complete " + this);
    }
@@ -340,136 +157,17 @@ public class QpidActivation implements E
    protected synchronized void teardown()
    {
       _log.debug("Tearing down " + _spec);
-
-      try
-      {
-         if (_connection != null)
-         {
-            _connection.stop();
-         }
-      }
-      catch (Throwable t)
-      {
-         _log.debug("Error stopping connection " + Util.asString(_connection), t);
-      }
+      
+      super.teardown();
 
       for (QpidMessageHandler handler : _handlers)
       {
-         handler.teardown();
+         handler.stop();
       }
 
-      try
-      {
-         if (_connection != null)
-         {
-            _connection.close();
-         }
-      }
-      catch (Throwable t)
-      {
-         _log.debug("Error closing connection " + Util.asString(_connection), t);
-      }
-      if (_spec.isHasBeenUpdated())
-      {
-         _factory = null;
-      }
       _log.debug("Tearing down complete " + this);
    }
 
-   protected void setupCF() throws Exception
-   {
-      if (_spec.isHasBeenUpdated())
-      {
-         _factory = _ra.createAMQConnectionFactory(_spec);
-      }
-      else
-      {
-         _factory = _ra.getDefaultAMQConnectionFactory();
-      }
-   }
-
-   public Destination getDestination()
-   {
-      return _destination;
-   }
-
-   protected void setupDestination() throws Exception
-   {
-
-      String destinationName = _spec.getDestination();
-      String destinationTypeString = _spec.getDestinationType();
-
-      if (_spec.isUseJNDI())
-      {
-         Context ctx = new InitialContext();
-         _log.debug("Using context " + ctx.getEnvironment() + " for " + _spec);
-         if (_log.isTraceEnabled())
-         {
-            _log.trace("setupDestination(" + ctx + ")");
-         }
-
-         if (destinationTypeString != null && !destinationTypeString.trim().equals(""))
-         {
-            _log.debug("Destination type defined as " + destinationTypeString);
-
-            Class<? extends Destination> destinationType;
-            if (Topic.class.getName().equals(destinationTypeString))
-            {
-               destinationType = Topic.class;
-               _isTopic = true;
-            }
-            else
-            {
-               destinationType = Queue.class;
-            }
-
-            _log.debug("Retrieving destination " + destinationName +
-                                        " of type " +
-                                        destinationType.getName());
-            _destination = Util.lookup(ctx, destinationName, destinationType);
-
-         }
-         else
-         {
-            _log.debug("Destination type not defined");
-            _log.debug("Retrieving destination " + destinationName +
-                                        " of type " +
-                                        Destination.class.getName());
-
-            _destination = Util.lookup(ctx, destinationName, AMQDestination.class);
-            _isTopic = !(_destination instanceof Queue) ;
-         }
-      }
-      else
-      {
-         _destination = (AMQDestination)AMQDestination.createDestination(_spec.getDestination());
-         if (destinationTypeString != null && !destinationTypeString.trim().equals(""))
-         {
-            _log.debug("Destination type defined as " + destinationTypeString);
-            final boolean match ;
-            if (Topic.class.getName().equals(destinationTypeString))
-            {
-               match = (_destination instanceof Topic) ;
-               _isTopic = true;
-            }
-            else
-            {
-               match = (_destination instanceof Queue) ;
-            }
-            if (!match)
-            {
-               throw new ClassCastException("Expected destination of type " + destinationTypeString + " but created destination " + _destination) ;
-            }
-         }
-         else
-         {
-            _isTopic = !(_destination instanceof Queue) ;
-         }
-      }
-
-      _log.debug("Got destination " + _destination + " from " + destinationName);
-   }
-
    /**
     * Get a string representation
     *
@@ -492,94 +190,7 @@ public class QpidActivation implements E
       return buffer.toString();
    }
 
-   public void onException(final JMSException jmse)
-   {
-       if(_activated.get())
-       {
-           handleFailure(jmse) ;
-       }
-       else
-       {
-           _log.warn("Received JMSException: " + jmse + " while endpoint was not activated.");
-       }
-   }
-
-   /**
-    * Handles any failure by trying to reconnect
-    *
-    * @param failure the reason for the failure
-    */
-   public void handleFailure(Throwable failure)
-   {
-      if(doesNotExist(failure))
-      {
-         _log.info("awaiting topic/queue creation " + getActivationSpec().getDestination());
-      }
-      else
-      {
-         _log.warn("Failure in Qpid activation " + _spec, failure);
-      }
-      int reconnectCount = 0;
-      int setupAttempts = _spec.getSetupAttempts();
-      long setupInterval = _spec.getSetupInterval();
-
-      // Only enter the failure loop once
-      if (_inFailure.getAndSet(true))
-         return;
-      try
-      {
-         while (_deliveryActive.get() && (setupAttempts == -1 || reconnectCount < setupAttempts))
-         {
-            teardown();
-
-            try
-            {
-               Thread.sleep(setupInterval);
-            }
-            catch (InterruptedException e)
-            {
-               _log.debug("Interrupted trying to reconnect " + _spec, e);
-               break;
-            }
-
-            _log.info("Attempting to reconnect " + _spec);
-            try
-            {
-               setup();
-               _log.info("Reconnected with Qpid");
-               break;
-            }
-            catch (Throwable t)
-            {
-               if(doesNotExist(failure))
-               {
-                  _log.info("awaiting topic/queue creation " + getActivationSpec().getDestination());
-               }
-               else
-               {
-                  _log.error("Unable to reconnect " + _spec, t);
-               }
-            }
-            ++reconnectCount;
-         }
-      }
-      finally
-      {
-         // Leaving failure recovery loop
-         _inFailure.set(false);
-      }
-   }
-
-   /**
-    * Check to see if the failure represents a missing endpoint
-    * @param failure The failure.
-    * @return true if it represents a missing endpoint, false otherwise
-    */
-   private boolean doesNotExist(final Throwable failure)
-   {
-      return (failure instanceof AMQException) && (((AMQException)failure).getErrorCode() == AMQConstant.NOT_FOUND) ;
-   }
-
+    
    /**
     * Handles the setup
     */

Modified: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivationSpec.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivationSpec.java?rev=1333473&r1=1333472&r2=1333473&view=diff
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivationSpec.java (original)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivationSpec.java Thu May  3 14:23:03 2012
@@ -28,10 +28,10 @@ import javax.resource.spi.ActivationSpec
 import javax.resource.spi.InvalidPropertyException;
 import javax.resource.spi.ResourceAdapter;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.qpid.ra.ConnectionFactoryProperties;
 import org.apache.qpid.ra.QpidResourceAdapter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * The activation spec
@@ -88,6 +88,8 @@ public class QpidActivationSpec extends 
    // undefined by default, default is specified at the RA level in QpidRAProperties
    private Long _setupInterval;
 
+   private Boolean _useConnectionPerHandler;
+
    /**
     * Constructor
     */
@@ -544,6 +546,16 @@ public class QpidActivationSpec extends 
       this._setupInterval = setupInterval;
    }
 
+   public Boolean isUseConnectionPerHandler()
+   {              
+       return (_useConnectionPerHandler == null) ? _ra.isUseConnectionPerHandler() : _useConnectionPerHandler;
+   }
+   
+   public void setUseConnectionPerHandler(Boolean connectionPerHandler)
+   {
+       this._useConnectionPerHandler = connectionPerHandler;                       
+   }
+
    /**
     * Validate
     * @exception InvalidPropertyException Thrown if a validation exception occurs
@@ -561,6 +573,7 @@ public class QpidActivationSpec extends 
       }
    }
 
+   
    /**
     * Get a string representation
     * @return The value
@@ -573,23 +586,30 @@ public class QpidActivationSpec extends 
       buffer.append("ra=").append(_ra);
       buffer.append(" destination=").append(_destination);
       buffer.append(" destinationType=").append(_destinationType);
+      
       if (_messageSelector != null)
       {
          buffer.append(" selector=").append(_messageSelector);
       }
+      
       buffer.append(" ack=").append(getAcknowledgeMode());
       buffer.append(" durable=").append(_subscriptionDurability);
       buffer.append(" clientID=").append(getClientId());
+      
       if (_subscriptionName != null)
       {
          buffer.append(" subscription=").append(_subscriptionName);
       }
+      
       buffer.append(" user=").append(getUserName());
+      
       if (getPassword() != null)
       {
-         buffer.append(" password=").append("****");
+         buffer.append(" password=").append("********");
       }
+      
       buffer.append(" maxSession=").append(_maxSession);
+      
       if (_prefetchLow != null)
       {
          buffer.append(" prefetchLow=").append(_prefetchLow);
@@ -598,7 +618,10 @@ public class QpidActivationSpec extends 
       {
          buffer.append(" prefetchHigh=").append(_prefetchHigh);
       }
+      
+      buffer.append(" connectionPerHandler=").append(isUseConnectionPerHandler());
       buffer.append(')');
+
       return buffer.toString();
    }
 }

Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidExceptionHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidExceptionHandler.java?rev=1333473&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidExceptionHandler.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidExceptionHandler.java Thu May  3 14:23:03 2012
@@ -0,0 +1,339 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.ra.inflow;
+
+import java.lang.reflect.Method;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Queue;
+import javax.jms.Topic;
+import javax.jms.XAConnectionFactory;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.resource.ResourceException;
+import javax.resource.spi.endpoint.MessageEndpointFactory;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.ra.QpidResourceAdapter;
+import org.apache.qpid.ra.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class QpidExceptionHandler implements ExceptionListener
+{
+    private static final Logger _log = LoggerFactory.getLogger(QpidExceptionHandler.class);
+
+    public static final Method ONMESSAGE;
+
+    protected final MessageEndpointFactory _endpointFactory;
+
+    protected Connection _connection;
+
+    protected ConnectionFactory _factory;
+
+    protected Destination _destination;
+    
+    protected final QpidResourceAdapter _ra;
+
+    protected final QpidActivationSpec _spec;
+
+    protected boolean _isDeliveryTransacted;
+
+    protected final AtomicBoolean _deliveryActive = new AtomicBoolean(false);
+
+    protected boolean _isTopic = false;
+    
+    // Whether we are in the failure recovery loop
+    protected AtomicBoolean _inFailure = new AtomicBoolean(false);
+
+    //Whether or not we have completed activating
+    protected AtomicBoolean _activated = new AtomicBoolean(false);
+
+    static
+    {
+       try
+       {
+          ONMESSAGE = MessageListener.class.getMethod("onMessage", new Class[] { Message.class });
+       }
+       catch (Exception e)
+       {
+          throw new RuntimeException(e);
+       }
+    }
+   
+    public abstract void setup() throws Exception;
+    public abstract void start() throws Exception;
+    public abstract void stop();
+    
+    protected QpidExceptionHandler(QpidResourceAdapter ra,
+                                   QpidActivationSpec spec,
+                                   MessageEndpointFactory endpointFactory) throws ResourceException
+    {
+        this._ra = ra;
+        this._spec = spec;
+        this._endpointFactory = endpointFactory;
+        
+        try
+        {
+           _isDeliveryTransacted = endpointFactory.isDeliveryTransacted(QpidActivation.ONMESSAGE);
+        }
+        catch (Exception e)
+        {
+           throw new ResourceException(e);
+        }
+   
+        
+    }
+    
+    public void onException(JMSException e)
+    {
+        if(_activated.get())
+        {
+            handleFailure(e) ;
+        }
+        else
+        {
+            _log.warn("Received JMSException: " + e + " while endpoint was not activated.");
+        }
+    }
+    
+    /**
+     * Handles any failure by trying to reconnect
+     *
+     * @param failure the reason for the failure
+     */
+    public void handleFailure(Throwable failure)
+    {
+       if(doesNotExist(failure))
+       {
+          _log.info("awaiting topic/queue creation " + _spec.getDestination());
+       }
+       else
+       {
+          _log.warn("Failure in Qpid activation " + _spec, failure);
+       }
+       int reconnectCount = 0;
+       int setupAttempts = _spec.getSetupAttempts();
+       long setupInterval = _spec.getSetupInterval();
+
+       // Only enter the failure loop once
+       if (_inFailure.getAndSet(true))
+          return;
+       try
+       {
+          while (_deliveryActive.get() && (setupAttempts == -1 || reconnectCount < setupAttempts))
+          {
+             teardown();
+
+             try
+             {
+                Thread.sleep(setupInterval);
+             }
+             catch (InterruptedException e)
+             {
+                _log.debug("Interrupted trying to reconnect " + _spec, e);
+                break;
+             }
+
+             _log.info("Attempting to reconnect " + _spec);
+             try
+             {
+                setup();
+                _log.info("Reconnected with Qpid");
+                break;
+             }
+             catch (Throwable t)
+             {
+                if(doesNotExist(failure))
+                {
+                   _log.info("awaiting topic/queue creation " + _spec.getDestination());
+                }
+                else
+                {
+                   _log.error("Unable to reconnect " + _spec, t);
+                }
+             }
+             ++reconnectCount;
+          }
+       }
+       finally
+       {
+          // Leaving failure recovery loop
+          _inFailure.set(false);
+       }
+    }
+
+    /**
+     * Check to see if the failure represents a missing endpoint
+     * @param failure The failure.
+     * @return true if it represents a missing endpoint, false otherwise
+     */
+    protected boolean doesNotExist(final Throwable failure)
+    {
+       return (failure instanceof AMQException) && (((AMQException)failure).getErrorCode() == AMQConstant.NOT_FOUND) ;
+    }
+
+    protected boolean isXA()
+    {
+        return _isDeliveryTransacted && !_spec.isUseLocalTx();
+    }
+    
+    protected void setupConnection() throws Exception
+    {
+        this._connection = (isXA()) ? ((XAConnectionFactory)_factory).createXAConnection() : _factory.createConnection();        
+    }
+        
+    protected synchronized void teardown()
+    {
+       _log.debug("Tearing down " + _spec);
+
+       try
+       {
+          if (_connection != null)
+          {
+             _connection.stop();
+          }
+       }
+       catch (Throwable t)
+       {
+          _log.debug("Error stopping connection " + Util.asString(_connection), t);
+       }
+
+       try
+       {
+          if (_connection != null)
+          {
+             _connection.close();
+          }
+       }
+       catch (Throwable t)
+       {
+          _log.debug("Error closing connection " + Util.asString(_connection), t);
+       }
+       if (_spec.isHasBeenUpdated())
+       {
+          _factory = null;
+       }
+       _log.debug("Tearing down complete " + this);
+    }
+
+    protected void setupCF() throws Exception
+    {
+       if (_spec.isHasBeenUpdated())
+       {
+          _factory = _ra.createAMQConnectionFactory(_spec);
+       }
+       else
+       {
+          _factory = _ra.getDefaultAMQConnectionFactory();
+       }
+    }
+    
+    protected void setupDestination() throws Exception
+    {
+
+       String destinationName = _spec.getDestination();
+       String destinationTypeString = _spec.getDestinationType();
+
+       if (_spec.isUseJNDI())
+       {
+          Context ctx = new InitialContext();
+          _log.debug("Using context " + ctx.getEnvironment() + " for " + _spec);
+          if (_log.isTraceEnabled())
+          {
+             _log.trace("setupDestination(" + ctx + ")");
+          }
+
+          if (destinationTypeString != null && !destinationTypeString.trim().equals(""))
+          {
+             _log.debug("Destination type defined as " + destinationTypeString);
+
+             Class<? extends Destination> destinationType;
+             if (Topic.class.getName().equals(destinationTypeString))
+             {
+                destinationType = Topic.class;
+                _isTopic = true;
+             }
+             else
+             {
+                destinationType = Queue.class;
+             }
+
+             _log.debug("Retrieving destination " + destinationName +
+                                         " of type " +
+                                         destinationType.getName());
+             _destination = Util.lookup(ctx, destinationName, destinationType);
+
+          }
+          else
+          {
+             _log.debug("Destination type not defined");
+             _log.debug("Retrieving destination " + destinationName +
+                                         " of type " +
+                                         Destination.class.getName());
+
+             _destination = Util.lookup(ctx, destinationName, AMQDestination.class);
+             _isTopic = !(_destination instanceof Queue) ;
+          }
+       }
+       else
+       {
+          _destination = (AMQDestination)AMQDestination.createDestination(_spec.getDestination());
+          
+          if (destinationTypeString != null && !destinationTypeString.trim().equals(""))
+          {
+             _log.debug("Destination type defined as " + destinationTypeString);
+             final boolean match ;
+             if (Topic.class.getName().equals(destinationTypeString))
+             {
+                match = (_destination instanceof Topic) ;
+                _isTopic = true;
+             }
+             else
+             {
+                match = (_destination instanceof Queue) ;
+             }
+             if (!match)
+             {
+                throw new ClassCastException("Expected destination of type " + destinationTypeString + " but created destination " + _destination) ;
+             }
+          }
+          else
+          {
+             _isTopic = !(_destination instanceof Queue) ;
+          }
+       }
+
+       _log.debug("Got destination " + _destination + " from " + destinationName);
+    }
+    
+    
+
+}

Modified: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidMessageHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidMessageHandler.java?rev=1333473&r1=1333472&r2=1333473&view=diff
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidMessageHandler.java (original)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidMessageHandler.java Thu May  3 14:23:03 2012
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.ra.inflow;
 
+import javax.jms.Connection;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
@@ -35,6 +36,9 @@ import javax.transaction.Status;
 import javax.transaction.TransactionManager;
 import javax.transaction.xa.XAResource;
 
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.XAConnectionImpl;
+import org.apache.qpid.ra.QpidResourceAdapter;
 import org.apache.qpid.ra.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,82 +47,100 @@ import org.slf4j.LoggerFactory;
  * The message handler
  *
  */
-public class QpidMessageHandler implements MessageListener
+public class QpidMessageHandler extends QpidExceptionHandler implements MessageListener
 {
-   /**
-    * The logger
-    */
    private static final Logger _log = LoggerFactory.getLogger(QpidMessageHandler.class);
 
-   /**
-    * The session
-    */
-   private final Session _session;
-
    private MessageConsumer _consumer;
 
-   /**
-    * The endpoint
-    */
    private MessageEndpoint _endpoint;
 
-   private final QpidActivation _activation;
-
-   private boolean _useLocalTx;
-
-   private boolean _transacted;
+   private Session _session;
 
    private final TransactionManager _tm;
 
-   public QpidMessageHandler(final QpidActivation activation,
-                                final TransactionManager tm,
-                                final Session session)
+   public QpidMessageHandler(final QpidResourceAdapter ra,
+                             final QpidActivationSpec spec,
+                             final MessageEndpointFactory endpointFactory,
+                             final TransactionManager tm,
+                             final Connection connection) throws ResourceException
    {
-      this._activation = activation;
-      this._session = session;
+      super(ra, spec, endpointFactory);      
       this._tm = tm;
+      this._connection = connection;
    }
-
+   
+   public QpidMessageHandler(final QpidResourceAdapter ra,
+                             final QpidActivationSpec spec,
+                             final MessageEndpointFactory endpointFactory,
+                             final TransactionManager tm) throws ResourceException
+   {
+       super(ra, spec, endpointFactory);
+       this._tm = tm;
+   }
+   
    public void setup() throws Exception
    {
       if (_log.isTraceEnabled())
       {
          _log.trace("setup()");
       }
-
-      QpidActivationSpec spec = _activation.getActivationSpec();
-      String selector = spec.getMessageSelector();
-
+      
+      setupCF();
+      setupDestination();
+      String selector = _spec.getMessageSelector();
+      
+      if(_spec.isUseConnectionPerHandler())
+      {
+          setupConnection();
+          _connection.setExceptionListener(this);
+      }      
+      
+      if(isXA())
+      {
+          _session = _ra.createXASession((XAConnectionImpl)_connection);          
+      }
+      else
+      {
+          _session = _ra.createSession((AMQConnection)_connection,
+                                      _spec.getAcknowledgeModeInt(),
+                                      _spec.isUseLocalTx(),
+                                      _spec.getPrefetchLow(),
+                                      _spec.getPrefetchHigh());
+      }
       // Create the message consumer
-      if (_activation.isTopic())
+      if (_isTopic)
       {
-         final Topic topic = (Topic) _activation.getDestination();
-         final String subscriptionName = spec.getSubscriptionName();
-         if (spec.isSubscriptionDurable())
-            _consumer = _session.createDurableSubscriber(topic, subscriptionName, selector, false);
+         final Topic topic = (Topic) _destination;
+         final String subscriptionName = _spec.getSubscriptionName();
+       
+         if (_spec.isSubscriptionDurable())
+         {
+             _consumer = _session.createDurableSubscriber(topic, subscriptionName, selector, false);             
+         }
          else
-            _consumer = _session.createConsumer(topic, selector) ;
+         {
+             _consumer = _session.createConsumer(topic, selector) ;             
+         }
       }
       else
       {
-         final Queue queue = (Queue) _activation.getDestination();
+         final Queue queue = (Queue) _destination;
          _consumer = _session.createConsumer(queue, selector);
       }
 
-      // Create the endpoint, if we are transacted pass the session so it is enlisted, unless using Local TX
-      MessageEndpointFactory endpointFactory = _activation.getMessageEndpointFactory();
-      _useLocalTx = _activation.getActivationSpec().isUseLocalTx();
-      _transacted = _activation.isDeliveryTransacted() || _useLocalTx ;
-      if (_activation.isDeliveryTransacted() && !_activation.getActivationSpec().isUseLocalTx())
+      if (isXA())
       {
          final XAResource xaResource = ((XASession)_session).getXAResource() ;
-         _endpoint = endpointFactory.createEndpoint(xaResource);
+         _endpoint = _endpointFactory.createEndpoint(xaResource);
       }
       else
       {
-         _endpoint = endpointFactory.createEndpoint(null);
+         _endpoint = _endpointFactory.createEndpoint(null);
       }
       _consumer.setMessageListener(this);
+      _connection.start();
+      _activated.set(true);
    }
 
    /**
@@ -126,11 +148,13 @@ public class QpidMessageHandler implemen
     */
    public void teardown()
    {
-      if (_log.isTraceEnabled())
-      {
-         _log.trace("teardown()");
-      }
+       if (_log.isTraceEnabled())
+       {
+          _log.trace("teardown()");
+       }
 
+      super.teardown();
+      
       try
       {
          if (_endpoint != null)
@@ -156,27 +180,28 @@ public class QpidMessageHandler implemen
 
       try
       {
-         if (_activation.getActivationSpec().getTransactionTimeout() > 0 && _tm != null)
+         if (_spec.getTransactionTimeout() > 0 && _tm != null)
          {
-            _tm.setTransactionTimeout(_activation.getActivationSpec().getTransactionTimeout());
+            _tm.setTransactionTimeout(_spec.getTransactionTimeout());
          }
 
          _endpoint.beforeDelivery(QpidActivation.ONMESSAGE);
          beforeDelivery = true;
 
-         if(_transacted)
+         if(isXA())
          {
              message.acknowledge();
          }
 
          ((MessageListener)_endpoint).onMessage(message);
 
-         if (_transacted && (_tm.getTransaction() != null))
+         if (isXA() && (_tm.getTransaction() != null))
          {
             final int status = _tm.getStatus() ;
             final boolean rollback = status == Status.STATUS_MARKED_ROLLBACK
                || status == Status.STATUS_ROLLING_BACK
                || status == Status.STATUS_ROLLEDBACK;
+            
             if (rollback)
             {
                _session.recover() ;
@@ -196,7 +221,7 @@ public class QpidMessageHandler implemen
             _log.warn("Unable to call after delivery", e);
             return;
          }
-         if (_useLocalTx)
+         if (!isXA() && _spec.isUseLocalTx())
          {
             _session.commit();
          }
@@ -216,7 +241,7 @@ public class QpidMessageHandler implemen
                _log.warn("Unable to call after delivery", e);
             }
          }
-         if (_useLocalTx || !_activation.isDeliveryTransacted())
+         if (!isXA() && _spec.isUseLocalTx())
          {
             try
             {
@@ -241,5 +266,17 @@ public class QpidMessageHandler implemen
       }
 
    }
+   
+   public void start() throws Exception
+   {
+       _deliveryActive.set(true);
+       setup();
+   }
+   
+   public void stop()
+   {
+       _deliveryActive.set(false);
+       teardown();
+   }
 
 }

Modified: qpid/trunk/qpid/java/jca/src/main/resources/META-INF/ra.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/resources/META-INF/ra.xml?rev=1333473&r1=1333472&r2=1333473&view=diff
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/resources/META-INF/ra.xml (original)
+++ qpid/trunk/qpid/java/jca/src/main/resources/META-INF/ra.xml Thu May  3 14:23:03 2012
@@ -109,6 +109,13 @@
       <config-property-type>java.lang.String</config-property-type>
       <config-property-value>amqp://anonymous:passwd@client/test?brokerlist='tcp://localhost?sasl_mechs='PLAIN''</config-property-value>
     </config-property>
+    
+    <config-property>
+      <description>Use a JMS Connection per MessageHandler</description>
+      <config-property-name>UseConnectionPerHandler</config-property-name>
+      <config-property-type>java.lang.Boolean</config-property-type>
+      <config-property-value>true</config-property-value>
+    </config-property>
 
     <outbound-resourceadapter>
       <connection-definition>

Copied: qpid/trunk/qpid/java/jca/src/test/java/org/apache/qpid/ra/QpidActivationSpecTest.java (from r1333150, qpid/trunk/qpid/java/jca/src/test/java/org/apache/qpid/ra/QpidResourceAdapterTest.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/test/java/org/apache/qpid/ra/QpidActivationSpecTest.java?p2=qpid/trunk/qpid/java/jca/src/test/java/org/apache/qpid/ra/QpidActivationSpecTest.java&p1=qpid/trunk/qpid/java/jca/src/test/java/org/apache/qpid/ra/QpidResourceAdapterTest.java&r1=1333150&r2=1333473&rev=1333473&view=diff
==============================================================================
--- qpid/trunk/qpid/java/jca/src/test/java/org/apache/qpid/ra/QpidResourceAdapterTest.java (original)
+++ qpid/trunk/qpid/java/jca/src/test/java/org/apache/qpid/ra/QpidActivationSpecTest.java Thu May  3 14:23:03 2012
@@ -20,46 +20,26 @@
  */
 package org.apache.qpid.ra;
 
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+
 import javax.resource.spi.ResourceAdapterInternalException;
 
-import junit.framework.TestCase;
+import org.apache.qpid.ra.inflow.QpidActivationSpec;
 
+import junit.framework.TestCase;
 
-public class QpidResourceAdapterTest extends TestCase
+public class QpidActivationSpecTest extends TestCase
 {
-    public void testGetXAResources() throws Exception
-    {
-        QpidResourceAdapter ra = new QpidResourceAdapter();
-        assertNull(ra.getXAResources(null));
-    }
 
-    public void testTransactionManagerLocatorException() throws Exception
+    public void testActivationSpecBasicSerialization() throws Exception
     {
-
-        QpidResourceAdapter ra = new QpidResourceAdapter();
-        assertNull(ra.getTransactionManagerLocatorClass());
-        assertNull(ra.getTransactionManagerLocatorMethod());
-
-        try
-        {
-            ra.start(null);
-        }
-        catch(ResourceAdapterInternalException e)
-        {
-
-        }
-
-        ra.setTransactionManagerLocatorClass("DummyLocator");
-
-        try
-        {
-            ra.start(null);
-        }
-        catch(ResourceAdapterInternalException e)
-        {
-
-        }
-
+        QpidActivationSpec spec = new QpidActivationSpec();
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        ObjectOutputStream oos = new ObjectOutputStream(out);
+        oos.writeObject(spec);
+        oos.close();
+        assertTrue(out.toByteArray().length > 0);
     }
 
 }

Modified: qpid/trunk/qpid/java/jca/src/test/java/org/apache/qpid/ra/QpidResourceAdapterTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/test/java/org/apache/qpid/ra/QpidResourceAdapterTest.java?rev=1333473&r1=1333472&r2=1333473&view=diff
==============================================================================
--- qpid/trunk/qpid/java/jca/src/test/java/org/apache/qpid/ra/QpidResourceAdapterTest.java (original)
+++ qpid/trunk/qpid/java/jca/src/test/java/org/apache/qpid/ra/QpidResourceAdapterTest.java Thu May  3 14:23:03 2012
@@ -22,6 +22,9 @@ package org.apache.qpid.ra;
 
 import javax.resource.spi.ResourceAdapterInternalException;
 
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+
 import junit.framework.TestCase;
 
 
@@ -62,4 +65,14 @@ public class QpidResourceAdapterTest ext
 
     }
 
+    public void testResourceAdapterBasicSerialization() throws Exception
+    {
+
+        QpidResourceAdapter ra = new QpidResourceAdapter();
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        ObjectOutputStream oos = new ObjectOutputStream(out);
+        oos.writeObject(ra);
+        oos.close();
+        assertTrue(out.toByteArray().length > 0);
+    }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message