qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tr...@apache.org
Subject svn commit: r1145706 - in /qpid/trunk/qpid/cpp/src: qpid/amqp_0_10/SessionHandler.cpp qpid/broker/Bridge.cpp qpid/broker/Bridge.h qpid/broker/Link.cpp tests/federation.py tests/run_federation_tests
Date Tue, 12 Jul 2011 18:29:22 GMT
Author: tross
Date: Tue Jul 12 18:29:22 2011
New Revision: 1145706

URL: http://svn.apache.org/viewvc?rev=1145706&view=rev
Log:
QPID-3352 - Federation bridge doesn't recover from session errors
Applied patch from Jason Dillaman

Modified:
    qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
    qpid/trunk/qpid/cpp/src/tests/federation.py
    qpid/trunk/qpid/cpp/src/tests/run_federation_tests

Modified: qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp?rev=1145706&r1=1145705&r2=1145706&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp Tue Jul 12 18:29:22 2011
@@ -188,9 +188,10 @@ void SessionHandler::detach(const std::s
 void SessionHandler::detached(const std::string& name, uint8_t code) {
     CHECK_NAME(name, "session.detached");
     awaitingDetached = false;
-    if (code != session::DETACH_CODE_NORMAL)
+    if (code != session::DETACH_CODE_NORMAL) {
+        sendReady = receiveReady = false;
         channelException(convert(code), "session.detached from peer.");
-    else {
+    } else {
         handleDetach();
     }
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp?rev=1145706&r1=1145705&r2=1145706&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp Tue Jul 12 18:29:22 2011
@@ -164,6 +164,12 @@ void Bridge::destroy()
     listener(this);
 }
 
+bool Bridge::isSessionReady() const
+{
+    SessionHandler& sessionHandler = conn->getChannel(id);
+    return sessionHandler.ready();
+}
+
 void Bridge::setPersistenceId(uint64_t pId) const
 {
     persistenceId = pId;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h?rev=1145706&r1=1145705&r2=1145706&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h Tue Jul 12 18:29:22 2011
@@ -59,6 +59,8 @@ public:
     void destroy();
     bool isDurable() { return args.i_durable; }
 
+    bool isSessionReady() const;
+
     management::ManagementObject* GetManagementObject() const;
     management::Manageable::status_t ManagementMethod(uint32_t methodId,
                                                       management::Args& args,

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp?rev=1145706&r1=1145705&r2=1145706&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp Tue Jul 12 18:29:22 2011
@@ -248,6 +248,19 @@ void Link::ioThreadProcessing()
         return;
     QPID_LOG(debug, "Link::ioThreadProcessing()");
 
+    // check for bridge session errors and recover
+    if (!active.empty()) {
+        Bridges::iterator removed = std::remove_if(
+            active.begin(), active.end(), !boost::bind(&Bridge::isSessionReady, _1));
+        for (Bridges::iterator i = removed; i != active.end(); ++i) {
+            Bridge::shared_ptr  bridge = *i;
+            bridge->closed();
+            bridge->cancel(*connection);
+            created.push_back(bridge);
+        }
+        active.erase(removed, active.end());
+    }
+
     //process any pending creates and/or cancellations
     if (!created.empty()) {
         for (Bridges::iterator i = created.begin(); i != created.end(); ++i) {
@@ -296,7 +309,7 @@ void Link::maintenanceVisit ()
             }
         }
     }
-    else if (state == STATE_OPERATIONAL && (!created.empty() || !cancellations.empty())
&& connection != 0)
+    else if (state == STATE_OPERATIONAL && (!active.empty() || !created.empty() ||
!cancellations.empty()) && connection != 0)
         connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
 }
 

Modified: qpid/trunk/qpid/cpp/src/tests/federation.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/federation.py?rev=1145706&r1=1145705&r2=1145706&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/federation.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/federation.py Tue Jul 12 18:29:22 2011
@@ -268,6 +268,63 @@ class FederationTests(TestBase010):
 
         self.verify_cleanup()
 
+    def test_pull_from_queue_recovery(self):
+        session = self.session
+
+        #setup queue on remote broker and add some messages
+        r_conn = self.connect(host=self.remote_host(), port=self.remote_port())
+        r_session = r_conn.session("test_pull_from_queue_recovery")
+        r_session.queue_declare(queue="my-bridge-queue", auto_delete=True)
+        for i in range(1, 6):
+            dp = r_session.delivery_properties(routing_key="my-bridge-queue")
+            r_session.message_transfer(message=Message(dp, "Message %d" % i))
+
+        #setup queue to receive messages from local broker
+        session.queue_declare(queue="fed1", exclusive=True, auto_delete=True)
+        session.exchange_bind(queue="fed1", exchange="amq.fanout")
+        self.subscribe(queue="fed1", destination="f1")
+        queue = session.incoming("f1")
+
+        self.startQmf()
+        qmf = self.qmf
+        broker = qmf.getObjects(_class="broker")[0]
+        result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest",
"guest", "tcp")
+        self.assertEqual(result.status, 0)
+
+        link = qmf.getObjects(_class="link")[0]
+        result = link.bridge(False, "my-bridge-queue", "amq.fanout", "my-key", "", "", True,
False, False, 1)
+        self.assertEqual(result.status, 0)
+
+        bridge = qmf.getObjects(_class="bridge")[0]
+        sleep(5)
+        
+        #recreate the remote bridge queue to invalidate the bridge session
+        r_session.queue_delete (queue="my-bridge-queue", if_empty=False, if_unused=False)
+        r_session.queue_declare(queue="my-bridge-queue", auto_delete=True)
+
+        #add some more messages (i.e. after bridge was created)
+        for i in range(6, 11):
+            dp = r_session.delivery_properties(routing_key="my-bridge-queue")
+            r_session.message_transfer(message=Message(dp, "Message %d" % i))
+
+        for i in range(1, 11):
+            try:
+                msg = queue.get(timeout=5)
+                self.assertEqual("Message %d" % i, msg.body)
+            except Empty:
+                self.fail("Failed to find expected message containing 'Message %d'" % i)
+        try:
+            extra = queue.get(timeout=1)
+            self.fail("Got unexpected message in queue: " + extra.body)
+        except Empty: None
+
+        result = bridge.close()
+        self.assertEqual(result.status, 0)
+        result = link.close()
+        self.assertEqual(result.status, 0)
+
+        self.verify_cleanup()
+
     def test_tracing_automatic(self):
         remoteUrl = "%s:%d" % (self.remote_host(), self.remote_port())
         self.startQmf()

Modified: qpid/trunk/qpid/cpp/src/tests/run_federation_tests
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/run_federation_tests?rev=1145706&r1=1145705&r2=1145706&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/run_federation_tests (original)
+++ qpid/trunk/qpid/cpp/src/tests/run_federation_tests Tue Jul 12 18:29:22 2011
@@ -55,7 +55,7 @@ stop_brokers() {
 if test -d ${PYTHON_DIR} ;  then
     start_brokers
     echo "Running federation tests using brokers on ports $LOCAL_PORT $REMOTE_PORT $REMOTE_B1
$REMOTE_B2"
-    $QPID_PYTHON_TEST -m federation $SKIPTESTS -b localhost:$LOCAL_PORT -Dremote-port=$REMOTE_PORT
-Dextra-brokers="$REMOTE_B1 $REMOTE_B2" $@
+    $QPID_PYTHON_TEST -m federation "$SKIPTESTS" -b localhost:$LOCAL_PORT -Dremote-port=$REMOTE_PORT
-Dextra-brokers="$REMOTE_B1 $REMOTE_B2" $@
     RETCODE=$?
     stop_brokers
     if test x$RETCODE != x0; then



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message