qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r1616702 - in /qpid/trunk/qpid/cpp/src: qpid/broker/Consumer.h qpid/broker/Queue.cpp qpid/ha/ReplicatingSubscription.cpp qpid/ha/ReplicatingSubscription.h tests/ha_tests.py
Date Fri, 08 Aug 2014 09:23:54 GMT
Author: aconway
Date: Fri Aug  8 09:23:54 2014
New Revision: 1616702

URL: http://svn.apache.org/r1616702
Log:
QPID-5973: HA cluster state may get stuck in recovering

A backup queue is considered "ready" when all messages up to the first guarded
position have either been replicated and acknowledged or dequeued.

Previously this was implemented by waiting for the replicationg subscription to
advance to the first guarded position and wating for all expected acks. However
if messages are dequeued out-of-order (which happens with transactions) there
can be a gap at the tail of the queue. The replicating subscription will not
advance past this gap because it only advances when there are messages to
consume. This resulted in backups stuck in catch-up. The recovering primary has
a time-out for backups that never re-connect, but if they connect sucessfully
and don't disconnect, the primary assumes they will become ready and waits -
causing the primary to be stuck in "recovering".

The fixes is to notify a replicating subscription if it becomes "stopped"
because there are no more messages available on the queue. This implies that
either it is at the tail OR there are no more messags until the tail. Either way
we should consider this "ready" from the point of view of HA catch-up.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
    qpid/trunk/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h?rev=1616702&r1=1616701&r2=1616702&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h Fri Aug  8 09:23:54 2014
@@ -91,6 +91,9 @@ class Consumer : public QueueCursor {
 
     const std::string& getTag() const { return tag; }
 
+    /** Called when there are no more messages immediately available for this consumer on
the queue */
+    virtual void stopped() {}
+
   protected:
     //framing::SequenceNumber position;
     const std::string tag;  // <destination> from AMQP 0-10 Message.subscribe command

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1616702&r1=1616701&r2=1616702&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Fri Aug  8 09:23:54 2014
@@ -472,6 +472,7 @@ bool Queue::getNextMessage(Message& m, C
             }
         } else {
             QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
+            c->stopped();
             listeners.addListener(c);
             break;
         }

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1616702&r1=1616701&r2=1616702&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp Fri Aug  8 09:23:54 2014
@@ -107,7 +107,8 @@ ReplicatingSubscription::ReplicatingSubs
     const framing::FieldTable& arguments
 ) : ConsumerImpl(parent, name, queue_, ack, REPLICATOR, exclusive, tag,
                  resumeId, resumeTtl, arguments),
-    position(0), ready(false), cancelled(false),
+
+    position(0), wasStopped(false), ready(false), cancelled(false),
     haBroker(hb),
     primary(boost::dynamic_pointer_cast<Primary>(haBroker.getRole()))
 {}
@@ -186,10 +187,23 @@ void ReplicatingSubscription::initialize
 
 ReplicatingSubscription::~ReplicatingSubscription() {}
 
+void ReplicatingSubscription::stopped() {
+    Mutex::ScopedLock l(lock);
+    // We have reached the last available message on the queue.
+    //
+    // Note that if messages have been removed out-of-order this may not be the
+    // head of the queue. We may not even have reached the guard
+    // position. However there are no more messages to protect and we will not
+    // be advanced any further, so we should consider ourselves guarded for
+    // purposes of readiness.
+    wasStopped = true;
+    checkReady(l);
+}
 
 // True if the next position for the ReplicatingSubscription is a guarded position.
 bool ReplicatingSubscription::isGuarded(sys::Mutex::ScopedLock&) {
-    return position+1 >= guard->getFirst();
+    // See comment in stopped()
+    return wasStopped || (position+1 >= guard->getFirst());
 }
 
 // Message is delivered in the subscription's connection thread.

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h?rev=1616702&r1=1616701&r2=1616702&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h Fri Aug  8 09:23:54 2014
@@ -113,6 +113,8 @@ class ReplicatingSubscription :
     void cancel();
     void acknowledged(const broker::DeliveryRecord&);
     bool browseAcquired() const { return true; }
+    void stopped();
+
     // Hide the "queue deleted" error for a ReplicatingSubscription when a
     // queue is deleted, this is normal and not an error.
     bool hideDeletedError() { return true; }
@@ -147,6 +149,7 @@ class ReplicatingSubscription :
     ReplicationIdSet dequeues;  // Dequeues to be sent in next dequeue event.
     ReplicationIdSet skip;   // Skip enqueues: messages already on backup and tx enqueues.
     ReplicationIdSet unready;   // Unguarded, replicated and un-acknowledged.
+    bool wasStopped;
     bool ready;
     bool cancelled;
     BrokerInfo info;

Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1616702&r1=1616701&r2=1616702&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Fri Aug  8 09:23:54 2014
@@ -20,7 +20,6 @@
 import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest
 import traceback
 from qpid.datatypes import uuid4, UUID
-from qpid.harness import Skipped
 from brokertest import *
 from ha_test import *
 from threading import Thread, Lock, Condition
@@ -363,7 +362,9 @@ class ReplicationTests(HaBrokerTest):
             cluster[0].wait_status("ready")
             cluster.bounce(1)
             # FIXME aconway 2014-02-20: pr does not fail over with 1.0/swig
-            if qm == qpid_messaging: raise Skipped("FIXME SWIG client failover bug")
+            if qm == qpid_messaging:
+                print "WARNING: Skipping SWIG client failover bug"
+                return
             self.assertEqual("a", pr.fetch().content)
             pr.session.acknowledge()
             backup.assert_browse_backup("q", ["b"])



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message