qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r1622592 - in /qpid/trunk/qpid/cpp/src: qpid/client/amqp0_10/ConnectionImpl.cpp qpid/client/amqp0_10/SenderImpl.cpp qpid/client/amqp0_10/SessionImpl.cpp qpid/client/amqp0_10/SessionImpl.h tests/ha_test.py tests/ha_tests.py
Date Fri, 05 Sep 2014 01:39:02 GMT
Author: aconway
Date: Fri Sep  5 01:39:01 2014
New Revision: 1622592

URL: http://svn.apache.org/r1622592
Log:
NO-JIRA: HA Fix ha_tests.py failures with SWIG 0.10 client.

- Fix un-necessary re-sends in amqp0_10::SenderImpl::replay.
- Throw NotFound and UnauthorizedAccess correctly from amqp0_10::SessionImpl and ConnectionImpl
- Fix ha_test wait_address and valid_address re-using a session after it is closed by NotFound.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
    qpid/trunk/qpid/cpp/src/tests/ha_test.py
    qpid/trunk/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp?rev=1622592&r1=1622591&r2=1622592&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp Fri Sep  5 01:39:01 2014
@@ -237,7 +237,7 @@ qpid::messaging::Session ConnectionImpl:
         } catch (const qpid::TransportFailure&) {
             reopen();
         } catch (const qpid::SessionException& e) {
-            throw qpid::messaging::SessionError(e.what());
+            SessionImpl::rethrow(e);
         } catch (const std::exception& e) {
             throw qpid::messaging::MessagingException(e.what());
         }

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp?rev=1622592&r1=1622591&r2=1622592&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp Fri Sep  5 01:39:01 2014
@@ -153,8 +153,9 @@ void SenderImpl::sendUnreliable(const qp
     sink->send(session, name, msg);
 }
 
-void SenderImpl::replay(const sys::Mutex::ScopedLock&)
+void SenderImpl::replay(const sys::Mutex::ScopedLock& l)
 {
+    checkPendingSends(false, l);
     for (OutgoingMessages::iterator i = outgoing.begin(); i != outgoing.end(); ++i) {
         i->markRedelivered();
         sink->send(session, name, *i);

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp?rev=1622592&r1=1622591&r2=1622592&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp Fri Sep  5 01:39:01 2014
@@ -36,6 +36,7 @@
 #include "qpid/messaging/Sender.h"
 #include "qpid/messaging/Receiver.h"
 #include "qpid/messaging/Session.h"
+#include "qpid/framing/enum.h"
 #include <boost/format.hpp>
 #include <boost/function.hpp>
 #include <boost/intrusive_ptr.hpp>
@@ -79,6 +80,10 @@ void SessionImpl::checkError()
         throw qpid::messaging::TargetCapacityExceeded(e.what());
     } catch (const qpid::framing::UnauthorizedAccessException& e) {
         throw qpid::messaging::UnauthorizedAccess(e.what());
+    } catch (const qpid::framing::NotFoundException& e) {
+        throw qpid::messaging::NotFound(e.what());
+    } catch (const qpid::framing::ResourceDeletedException& e) {
+        throw qpid::messaging::NotFound(e.what());
     } catch (const qpid::SessionException& e) {
         throw qpid::messaging::SessionError(e.what());
     } catch (const qpid::ConnectionException& e) {
@@ -405,10 +410,8 @@ bool SessionImpl::nextReceiver(qpid::mes
         } catch (const qpid::framing::ResourceLimitExceededException& e) {
             if (backoff()) return false;
             else throw qpid::messaging::TargetCapacityExceeded(e.what());
-        } catch (const qpid::framing::UnauthorizedAccessException& e) {
-            throw qpid::messaging::UnauthorizedAccess(e.what());
         } catch (const qpid::SessionException& e) {
-            throw qpid::messaging::SessionError(e.what());
+            rethrow(e);
         } catch (const qpid::ClosedException&) {
             throw qpid::messaging::SessionClosed();
         } catch (const qpid::ConnectionException& e) {
@@ -588,4 +591,16 @@ qpid::messaging::Connection SessionImpl:
     return qpid::messaging::Connection(connection.get());
 }
 
+void SessionImpl::rethrow(const qpid::SessionException& e) {
+    switch (e.code) {
+      case framing::execution::ERROR_CODE_NOT_ALLOWED:
+      case framing::execution::ERROR_CODE_UNAUTHORIZED_ACCESS: throw messaging::UnauthorizedAccess(e.what());
+
+      case framing::execution::ERROR_CODE_NOT_FOUND:
+      case framing::execution::ERROR_CODE_RESOURCE_DELETED: throw messaging::NotFound(e.what());
+
+      default: throw SessionError(e.what());
+    }
+}
+
 }}} // namespace qpid::client::amqp0_10

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h?rev=1622592&r1=1622591&r2=1622592&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h Fri Sep  5 01:39:01 2014
@@ -109,8 +109,13 @@ class SessionImpl : public qpid::messagi
             else throw qpid::messaging::TargetCapacityExceeded(e.what());
         } catch (const qpid::framing::UnauthorizedAccessException& e) {
             throw qpid::messaging::UnauthorizedAccess(e.what());
+        } catch (const qpid::framing::NotFoundException& e) {
+            throw qpid::messaging::NotFound(e.what());
+        } catch (const qpid::framing::ResourceDeletedException& e) {
+            throw qpid::messaging::NotFound(e.what());
         } catch (const qpid::SessionException& e) {
-            throw qpid::messaging::SessionError(e.what());
+            rethrow(e);
+            return false;       // Keep the compiler happy
         } catch (const qpid::ConnectionException& e) {
             throw qpid::messaging::ConnectionError(e.what());
         } catch (const qpid::ChannelException& e) {
@@ -119,6 +124,7 @@ class SessionImpl : public qpid::messagi
     }
 
     static SessionImpl& convert(qpid::messaging::Session&);
+    static void rethrow(const qpid::SessionException&);
 
   private:
     typedef std::map<std::string, qpid::messaging::Receiver> Receivers;

Modified: qpid/trunk/qpid/cpp/src/tests/ha_test.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_test.py?rev=1622592&r1=1622591&r2=1622592&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_test.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_test.py Fri Sep  5 01:39:01 2014
@@ -241,11 +241,11 @@ acl allow all all
 
     def wait_address(self, address):
         """Wait for address to become valid on the broker."""
-        bs = self.connect_admin().session()
-        try: wait_address(bs, address)
-        finally: bs.connection.close()
+        c = self.connect_admin()
+        try: wait_address(c, address)
+        finally: c.close()
 
-    def wait_backup(self, address): self.wait_address(address)
+    wait_backup = wait_address
 
     def browse(self, queue, timeout=0, transform=lambda m: m.content):
         c = self.connect_admin()
@@ -253,21 +253,15 @@ acl allow all all
             return browse(c.session(), queue, timeout, transform)
         finally: c.close()
 
-    def assert_browse(self, queue, expected, **kwargs):
-        """Verify queue contents by browsing."""
-        bs = self.connect().session()
-        try:
-            wait_address(bs, queue)
-            assert_browse_retry(bs, queue, expected, **kwargs)
-        finally: bs.connection.close()
-
     def assert_browse_backup(self, queue, expected, **kwargs):
         """Combines wait_backup and assert_browse_retry."""
-        bs = self.connect_admin().session()
+        c = self.connect_admin()
         try:
-            wait_address(bs, queue)
-            assert_browse_retry(bs, queue, expected, **kwargs)
-        finally: bs.connection.close()
+            wait_address(c, queue)
+            assert_browse_retry(c.session(), queue, expected, **kwargs)
+        finally: c.close()
+
+    assert_browse = assert_browse_backup
 
     def assert_connect_fail(self):
         try:
@@ -384,18 +378,17 @@ class HaCluster(object):
     def __iter__(self): return self._brokers.__iter__()
 
 
-def wait_address(session, address):
+def wait_address(connection, address):
     """Wait for an address to become valid."""
-    def check():
-        try: session.sender(address); return True
-        except  qm.NotFound: return False
-    assert retry(check), "Timed out waiting for address %s"%(address)
+    assert retry(lambda: valid_address(connection, address)), "Timed out waiting for address
%s"%(address)
 
-def valid_address(session, address):
+def valid_address(connection, address):
     """Test if an address is valid"""
     try:
-        session.receiver(address)
+        s = connection.session().receiver(address)
+        s.session.close()
         return True
-    except qm.NotFound: return False
+    except qm.NotFound:
+        return False
 
 

Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1622592&r1=1622591&r2=1622592&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Fri Sep  5 01:39:01 2014
@@ -82,7 +82,7 @@ class ReplicationTests(HaBrokerTest):
         def verify(b, prefix, p):
             """Verify setup was replicated to backup b"""
             # Wait for configuration to replicate.
-            wait_address(b, prefix+"x");
+            wait_address(b.connection, prefix+"x");
             self.assert_browse_retry(b, prefix+"q1", ["b", "1", "4"])
 
             self.assertEqual(p.receiver(prefix+"q1").fetch(timeout=0).content, "b")
@@ -90,7 +90,7 @@ class ReplicationTests(HaBrokerTest):
             self.assert_browse_retry(b, prefix+"q1", ["1", "4"])
 
             self.assert_browse_retry(b, prefix+"q2", []) # configuration only
-            assert not valid_address(b, prefix+"q3")
+            assert not valid_address(b.connection, prefix+"q3")
 
             # Verify exchange with replicate=all
             b.sender(prefix+"e1/key1").send(qm.Message(prefix+"e1"))
@@ -104,8 +104,8 @@ class ReplicationTests(HaBrokerTest):
             self.assert_browse_retry(b, prefix+"q4", ["6","7"])
 
             # Verify deletes
-            assert not valid_address(b, prefix+"dq")
-            assert not valid_address(b, prefix+"de")
+            assert not valid_address(b.connection, prefix+"dq")
+            assert not valid_address(b.connection, prefix+"de")
 
         l = LogLevel(ERROR) # Hide expected WARNING log messages from failover.
         try:
@@ -130,7 +130,7 @@ class ReplicationTests(HaBrokerTest):
             # Test a series of messages, enqueue all then dequeue all.
             primary.agent.addQueue("foo")
             s = p.sender("foo")
-            wait_address(b, "foo")
+            wait_address(b.connection, "foo")
             msgs = [str(i) for i in range(10)]
             for m in msgs: s.send(qm.Message(m))
             self.assert_browse_retry(p, "foo", msgs)
@@ -168,11 +168,8 @@ class ReplicationTests(HaBrokerTest):
 
         msgs = [str(i) for i in range(30)]
         b1 = backup1.connect_admin().session()
-        wait_address(b1, "q");
-        self.assert_browse_retry(b1, "q", msgs)
-        b2 = backup2.connect_admin().session()
-        wait_address(b2, "q");
-        self.assert_browse_retry(b2, "q", msgs)
+        backup1.assert_browse_backup("q", msgs)
+        backup2.assert_browse_backup("q", msgs)
 
     def test_send_receive(self):
         """Verify sequence numbers of messages sent by qpid-send"""
@@ -556,16 +553,16 @@ class ReplicationTests(HaBrokerTest):
 
         s1 = cluster[1].connect_admin().session()
         cluster[1].wait_backup("q")
-        assert not valid_address(s1, "exad")
-        assert valid_address(s1, "ex")
-        assert valid_address(s1, "ad")
-        assert valid_address(s1, "time")
+        assert not valid_address(s1.connection, "exad")
+        assert valid_address(s1.connection, "ex")
+        assert valid_address(s1.connection, "ad")
+        assert valid_address(s1.connection, "time")
 
         # Verify that auto-delete queues are not kept alive by
         # replicating subscriptions
         ad.close()
         s0.sync()
-        assert not valid_address(s0, "ad")
+        assert not valid_address(s0.connection, "ad")
 
     def test_broker_info(self):
         """Check that broker information is correctly published via management"""



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message