qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kw...@apache.org
Subject svn commit: r1160304 - /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
Date Mon, 22 Aug 2011 15:31:09 GMT
Author: kwall
Date: Mon Aug 22 15:31:08 2011
New Revision: 1160304

URL: http://svn.apache.org/viewvc?rev=1160304&view=rev
Log:
QPID-2904: Avoid race condition between SubFlushRunner and MinaAcceptor threads. Send lock
was not being held during MessageStop processing.

Applying patch by Robbie Gemmell and myself.

Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java?rev=1160304&r1=1160303&r2=1160304&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
(original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
Mon Aug 22 15:31:08 2011
@@ -109,15 +109,15 @@ public class Subscription_0_10 implement
     private final MessageAcquireMode _acquireMode;
     private MessageFlowMode _flowMode;
     private final ServerSession _session;
-    private AtomicBoolean _stopped = new AtomicBoolean(true);
+    private final AtomicBoolean _stopped = new AtomicBoolean(true);
     private static final Struct[] EMPTY_STRUCT_ARRAY = new Struct[0];
 
     private LogActor _logActor;
-    private Map<String, Object> _properties = new ConcurrentHashMap<String, Object>();
+    private final Map<String, Object> _properties = new ConcurrentHashMap<String,
Object>();
     private UUID _id;
     private String _traceExclude;
     private String _trace;
-    private long _createTime = System.currentTimeMillis();
+    private final long _createTime = System.currentTimeMillis();
     private final AtomicLong _deliveredCount = new AtomicLong(0);
     private final Map<String, Object> _arguments;
 
@@ -727,13 +727,22 @@ public class Subscription_0_10 implement
 
     public void stop()
     {
-        if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
+        try
         {
-            _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
+            getSendLock();
+
+            if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
+            {
+                _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
+            }
+            _stopped.set(true);
+            FlowCreditManager_0_10 creditManager = getCreditManager();
+            creditManager.clearCredit();
+        }
+        finally
+        {
+            releaseSendLock();
         }
-        _stopped.set(true);
-        FlowCreditManager_0_10 creditManager = getCreditManager();
-        creditManager.clearCredit();
     }
 
     public void addCredit(MessageCreditUnit unit, long value)



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message