qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r1441161 - in /qpid/trunk/qpid/cpp/src/qpid: ha/ReplicatingSubscription.cpp sys/AsynchIOHandler.cpp
Date Thu, 31 Jan 2013 19:43:27 GMT
Author: aconway
Date: Thu Jan 31 19:43:26 2013
New Revision: 1441161

URL: http://svn.apache.org/viewvc?rev=1441161&view=rev
Log:
QPID-4555: HA Fix race condition in rejecting connections.

Sporadic failure of test_failover_python was caused by a race in rejecting
connections. There was a very small window where work could be done by a
connection after it was rejected.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1441161&r1=1441160&r2=1441161&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp Thu Jan 31 19:43:26 2013
@@ -228,15 +228,16 @@ void ReplicatingSubscription::initialize
 }
 
 // Message is delivered in the subscription's connection thread.
-bool ReplicatingSubscription::deliver(const qpid::broker::QueueCursor& c, const qpid::broker::Message&
m) {
-    position = m.getSequence();
+bool ReplicatingSubscription::deliver(
+    const qpid::broker::QueueCursor& c, const qpid::broker::Message& m)
+{
     try {
-        QPID_LOG(trace, logPrefix << "Replicating " << getQueue()->getName()
<< "[" << m.getSequence() << "]");
+        QPID_LOG(trace, logPrefix << "Replicating " << m.getSequence());
         {
             Mutex::ScopedLock l(lock);
-            //FIXME GRS: position is no longer set//assert(position == m.getSequence());
+            position = m.getSequence();
 
-            // m.getSequence() is the position of the newly enqueued message on local queue.
+            // m.getSequence() is the position of the new message on local queue.
             // backupPosition is latest position on backup queue before enqueueing
             if (m.getSequence() <= backupPosition)
                 throw Exception(
@@ -252,7 +253,7 @@ bool ReplicatingSubscription::deliver(co
         }
         return ConsumerImpl::deliver(c, m);
     } catch (const std::exception& e) {
-        QPID_LOG(critical, logPrefix << "Error replicating " << getQueue()->getName()
<< "[" << m.getSequence() << "]"
+        QPID_LOG(critical, logPrefix << "Error replicating " << m.getSequence()
                  << ": " << e.what());
         throw;
     }
@@ -280,7 +281,7 @@ void ReplicatingSubscription::cancel()
 // Consumer override, called on primary in the backup's IO thread.
 void ReplicatingSubscription::acknowledged(const broker::DeliveryRecord& r) {
     // Finish completion of message, it has been acknowledged by the backup.
-    QPID_LOG(trace, logPrefix << "Acknowledged " << getQueue()->getName()
<< "[" << r.getMessageId() << "]");
+    QPID_LOG(trace, logPrefix << "Acknowledged " << r.getMessageId());
     guard->complete(r.getMessageId());
     // If next message is protected by the guard then we are ready
     if (r.getMessageId() >= guard->getRange().back) setReady();
@@ -309,7 +310,7 @@ void ReplicatingSubscription::sendDequeu
 // arbitrary connection threads.
 void ReplicatingSubscription::dequeued(const Message& m)
 {
-    QPID_LOG(trace, logPrefix << "Dequeued " << getQueue()->getName() <<
"[" << m.getSequence() << "]");
+    QPID_LOG(trace, logPrefix << "Dequeued " << m.getSequence());
     {
         Mutex::ScopedLock l(lock);
         dequeues.add(m.getSequence());

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp?rev=1441161&r1=1441160&r2=1441161&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp Thu Jan 31 19:43:26 2013
@@ -97,6 +97,7 @@ void AsynchIOHandler::abort() {
     if (!readError) {
         aio->requestCallback(boost::bind(&AsynchIOHandler::eof, this, _1));
     }
+    aio->queueWriteClose();
 }
 
 void AsynchIOHandler::activateOutput() {



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message