activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From djen...@apache.org
Subject svn commit: r732489 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: ActiveMQSession.java ActiveMQXASession.java
Date Wed, 07 Jan 2009 20:57:56 GMT
Author: djencks
Date: Wed Jan  7 12:57:56 2009
New Revision: 732489

URL: http://svn.apache.org/viewvc?rev=732489&view=rev
Log:
AMQ-2034 move delay-session-close-in-xa-tx code to plain session so it works in managed environments.
 Also prevent duplicate synchronization registrations

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXASession.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=732489&r1=732488&r2=732489&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java Wed
Jan  7 12:57:56 2009
@@ -139,7 +139,7 @@
 	 * acknowledges all messages consumed by a session at when acknowledge()
 	 * is called
 	 */
-	public static final int INDIVIDUAL_ACKNOWLEDGE=4;
+    public static final int INDIVIDUAL_ACKNOWLEDGE = 4;
 
     public static interface DeliveryListener {
         void beforeDelivery(ActiveMQSession session, Message msg);
@@ -163,6 +163,7 @@
     protected final CopyOnWriteArrayList<ActiveMQMessageProducer> producers = new CopyOnWriteArrayList<ActiveMQMessageProducer>();
 
     protected boolean closed;
+    private volatile boolean synchronizationRegistered;
     protected boolean asyncDispatch;
     protected boolean sessionAsyncDispatch;
     protected final boolean debug;
@@ -553,11 +554,34 @@
      */
     public void close() throws JMSException {
         if (!closed) {
-            dispose();
-            connection.asyncSendPacket(info.createRemoveCommand());
+            if (getTransacted()) {
+                if (!synchronizationRegistered) {
+                    synchronizationRegistered = true;
+                    getTransactionContext().addSynchronization(new Synchronization() {
+
+                                        public void afterCommit() throws Exception {
+                                            doClose();
+                                            synchronizationRegistered = false;
+                                        }
+
+                                        public void afterRollback() throws Exception {
+                                            doClose();
+                                            synchronizationRegistered = false;
+                                        }
+                                    });
+                }
+
+            } else {
+                doClose();
+            }
         }
     }
 
+    private void doClose() throws JMSException {
+        dispose();
+        connection.asyncSendPacket(info.createRemoveCommand());
+    }
+
     void clearMessagesInProgress() {
         executor.clearMessagesInProgress();
         for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();)
{

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXASession.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXASession.java?rev=732489&r1=732488&r2=732489&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXASession.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXASession.java
Wed Jan  7 12:57:56 2009
@@ -27,7 +27,6 @@
 import javax.transaction.xa.XAResource;
 
 import org.apache.activemq.command.SessionId;
-import org.apache.activemq.transaction.Synchronization;
 
 /**
  * The XASession interface extends the capability of Session by adding access
@@ -97,24 +96,6 @@
         return new ActiveMQTopicSession(this);
     }
 
-    @Override
-    public void close() throws JMSException {
-        if (getTransactionContext().isInXATransaction()) {
-            getTransactionContext().addSynchronization(new Synchronization() {
-                public void afterCommit() throws Exception {
-                    doClose();
-                }
-                
-                public void afterRollback() throws Exception {
-                    doClose();
-                }
-            });
-        }
-    }
-
-    void doClose() throws JMSException {
-        super.close();
-    }
     /**
      * This is called before transacted work is done by
      * the session.  XA Work can only be done when this



Mime
View raw message