qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r1227885 - /qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
Date Thu, 05 Jan 2012 22:51:13 GMT
Author: aconway
Date: Thu Jan  5 22:51:13 2012
New Revision: 1227885

URL: http://svn.apache.org/viewvc?rev=1227885&view=rev
Log:
QPID-3603: Code cleanup to make ReplicatingSubscription more readable.

Clarified deliver() and dequeued() logic and locking.

Modified:
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1227885&r1=1227884&r2=1227885&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp Thu Jan  5 22:51:13
2012
@@ -103,14 +103,6 @@ ReplicatingSubscription::ReplicatingSubs
 
     QPID_LOG(debug, logPrefix << "Created subscription " << name);
 
-    // Note that broker::Queue::getPosition() returns the sequence
-    // number that will be assigned to the next message *minus 1*.
-
-    // this->backupPosition tracks the position of the remote backup
-    // queue, i.e. the sequence number for the next delivered message
-    // *minus one*
-    backupPosition = 0;
-
     // FIXME aconway 2011-12-15: ConsumerImpl::position is left at 0
     // so we will start consuming from the lowest numbered message.
     // This is incorrect if the sequence number wraps around, but
@@ -121,22 +113,20 @@ ReplicatingSubscription::ReplicatingSubs
 bool ReplicatingSubscription::deliver(QueuedMessage& m) {
     // Add position events for the subscribed queue, not for the internal event queue.
     if (m.queue && m.queue == getQueue().get()) {
+        sys::Mutex::ScopedLock l(lock);
         assert(position == m.position);
-        {
-             sys::Mutex::ScopedLock l(lock);
-             // this->position is the new position after enqueueing m locally.
-             // this->backupPosition is the backup position before enqueueing m. 
-             assert(position > backupPosition);
-             if (position - backupPosition > 1) {
-                 // Position has advanced because of messages dequeued ahead of us.
-                 SequenceNumber send(position);
-                 --send;   // Send the position before m was enqueued.
-                 sendPositionEvent(send, l); 
-                 QPID_LOG(trace, logPrefix << "Sending position " << send
-                          << ", was " << backupPosition);
-             }
-             backupPosition = position;
+        // m.position is the position of the newly enqueued m on the local queue.
+        // backupPosition is latest position on the backup queue (before enqueueing m.)
+        assert(m.position > backupPosition);
+        if (m.position - backupPosition > 1) {
+            // Position has advanced because of messages dequeued ahead of us.
+            SequenceNumber send(m.position);
+            --send;   // Send the position before m was enqueued.
+            sendPositionEvent(send, l);
+            QPID_LOG(trace, logPrefix << "Sending position " << send
+                     << ", was " << backupPosition);
         }
+        backupPosition = m.position;
         QPID_LOG(trace, logPrefix << "Replicating message " << m.position);
     }
     return ConsumerImpl::deliver(m);
@@ -215,21 +205,25 @@ void ReplicatingSubscription::sendEvent(
 }
 
 // Called after the message has been removed from the deque and under
-// the message lock in the queue. Called in arbitrary connection threads.
+// the messageLock in the queue. Called in arbitrary connection threads.
 void ReplicatingSubscription::dequeued(const QueuedMessage& m)
 {
     QPID_LOG(trace, logPrefix << "Dequeued message " << m.position);
     {
         sys::Mutex::ScopedLock l(lock);
         dequeues.add(m.position);
+        // If we have not yet sent this message to the backup, then
+        // complete it now as it will never be accepted.
+
+        // FIXME aconway 2012-01-05: suspect use of position in
+        // foreign connection thread.  Race with deliver() which is
+        // not under the message lock?
+        if (m.position > position) {
+            m.payload->getIngressCompletion().finishCompleter();
+            QPID_LOG(trace, logPrefix << "Completed message " << m.position <<
" early");
+        }
     }
     notify();                   // Ensure a call to doDispatch
-    // FIXME aconway 2011-12-20: not thread safe to access position here,
-    // we're not in the dispatch thread.
-    if (m.position > position) {
-        m.payload->getIngressCompletion().finishCompleter();
-        QPID_LOG(trace, logPrefix << "Completed message " << m.position <<
" early");
-    }
 }
 
 // Called in subscription's connection thread.



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message