qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject svn commit: r1463175 - in /qpid/branches/0.22/qpid: ./ cpp/src/ cpp/src/qpid/messaging/amqp/ConnectionContext.cpp cpp/src/qpid/messaging/amqp/ConnectionContext.h cpp/src/qpid/messaging/amqp/SenderHandle.cpp cpp/src/qpid/messaging/amqp/SessionContext.cpp
Date Mon, 01 Apr 2013 16:08:11 GMT
Author: gsim
Date: Mon Apr  1 16:08:11 2013
New Revision: 1463175

URL: http://svn.apache.org/r1463175
Log:
QPID-4674: Detect asynchronous connection close, session end and link detach. Merged from
r1462138.

Modified:
    qpid/branches/0.22/qpid/   (props changed)
    qpid/branches/0.22/qpid/cpp/src/   (props changed)
    qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
    qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
    qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp
    qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp

Propchange: qpid/branches/0.22/qpid/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid:r1462138

Propchange: qpid/branches/0.22/qpid/cpp/src/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/cpp/src:r1462138

Modified: qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp?rev=1463175&r1=1463174&r2=1463175&view=diff
==============================================================================
--- qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp (original)
+++ qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp Mon Apr  1 16:08:11
2013
@@ -193,6 +193,7 @@ bool ConnectionContext::fetch(boost::sha
 {
     {
         qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+        checkClosed(ssn, lnk);
         if (!lnk->capacity) {
             pn_link_flow(lnk->receiver, 1);
             wakeupDriver();
@@ -212,7 +213,7 @@ bool ConnectionContext::fetch(boost::sha
             wakeupDriver();
             while (pn_link_credit(lnk->receiver) && !pn_link_queued(lnk->receiver))
{
                 QPID_LOG(debug, "Waiting for message or for credit to be drained: credit="
<< pn_link_credit(lnk->receiver) << ", queued=" << pn_link_queued(lnk->receiver));
-                wait();
+                wait(ssn, lnk);
             }
             if (lnk->capacity && pn_link_queued(lnk->receiver) == 0) {
                 pn_link_flow(lnk->receiver, lnk->capacity);
@@ -247,6 +248,7 @@ bool ConnectionContext::get(boost::share
     qpid::sys::AbsTime until(convert(timeout));
     while (true) {
         qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+        checkClosed(ssn, lnk);
         pn_delivery_t* current = pn_link_current((pn_link_t*) lnk->receiver);
         QPID_LOG(debug, "In ConnectionContext::get(), current=" << current);
         if (current) {
@@ -262,7 +264,7 @@ bool ConnectionContext::get(boost::share
             pn_link_advance(lnk->receiver);
             return true;
         } else if (until > qpid::sys::now()) {
-            wait();
+            wait(ssn, lnk);
         } else {
             return false;
         }
@@ -273,6 +275,7 @@ bool ConnectionContext::get(boost::share
 void ConnectionContext::acknowledge(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message*
message, bool cumulative)
 {
     qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+    checkClosed(ssn);
     if (message) {
         ssn->acknowledge(MessageImplAccess::get(*message).getInternalId(), cumulative);
     } else {
@@ -329,19 +332,20 @@ void ConnectionContext::attach(pn_sessio
     }
 }
 
-void ConnectionContext::send(boost::shared_ptr<SenderContext> snd, const qpid::messaging::Message&
message, bool sync)
+void ConnectionContext::send(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext>
snd, const qpid::messaging::Message& message, bool sync)
 {
     qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+    checkClosed(ssn);
     SenderContext::Delivery* delivery(0);
     while (!(delivery = snd->send(message))) {
         QPID_LOG(debug, "Waiting for capacity...");
-        wait();//wait for capacity
+        wait(ssn, snd);//wait for capacity
     }
     wakeupDriver();
     if (sync) {
         while (!delivery->accepted()) {
             QPID_LOG(debug, "Waiting for confirmation...");
-            wait();//wait until message has been confirmed
+            wait(ssn, snd);//wait until message has been confirmed
         }
     }
 }
@@ -408,15 +412,65 @@ void ConnectionContext::wakeupDriver()
     }
 }
 
+namespace {
+pn_state_t REQUIRES_CLOSE = PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED;
+pn_state_t IS_CLOSED = PN_LOCAL_CLOSED | PN_REMOTE_CLOSED;
+}
+
 void ConnectionContext::wait()
 {
     lock.wait();
     if (state == DISCONNECTED) {
         throw qpid::messaging::TransportFailure("Disconnected");
     }
-    //check for any closed links, sessions or indeed the connection
+    if ((pn_connection_state(connection) & REQUIRES_CLOSE) == REQUIRES_CLOSE) {
+        pn_connection_close(connection);
+        throw qpid::messaging::ConnectionError("Connection closed by peer");
+    }
+}
+void ConnectionContext::wait(boost::shared_ptr<SessionContext> ssn)
+{
+    wait();
+    checkClosed(ssn);
+}
+void ConnectionContext::wait(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext>
lnk)
+{
+    wait();
+    checkClosed(ssn, lnk);
+}
+void ConnectionContext::wait(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext>
lnk)
+{
+    wait();
+    checkClosed(ssn, lnk);
+}
+void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn)
+{
+    if ((pn_session_state(ssn->session) & REQUIRES_CLOSE) == REQUIRES_CLOSE) {
+        pn_session_close(ssn->session);
+        throw qpid::messaging::SessionError("Session ended by peer");
+    } else if ((pn_session_state(ssn->session) & IS_CLOSED) == IS_CLOSED) {
+        throw qpid::messaging::SessionError("Session has ended");
+    }
 }
 
+void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext>
lnk)
+{
+    checkClosed(ssn, lnk->receiver);
+}
+void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext>
lnk)
+{
+    checkClosed(ssn, lnk->sender);
+}
+void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn, pn_link_t*
lnk)
+{
+    checkClosed(ssn);
+    if ((pn_link_state(lnk) & REQUIRES_CLOSE) == REQUIRES_CLOSE) {
+        pn_link_close(lnk);
+        throw qpid::messaging::LinkError("Link detached by peer");
+    } else if ((pn_link_state(lnk) & IS_CLOSED) == IS_CLOSED) {
+        throw qpid::messaging::LinkError("Link is not attached");
+    }
+}
 boost::shared_ptr<SessionContext> ConnectionContext::newSession(bool transactional,
const std::string& n)
 {
     qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);

Modified: qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
URL: http://svn.apache.org/viewvc/qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h?rev=1463175&r1=1463174&r2=1463175&view=diff
==============================================================================
--- qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h (original)
+++ qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h Mon Apr  1 16:08:11
2013
@@ -72,7 +72,7 @@ class ConnectionContext : public qpid::s
     void endSession(boost::shared_ptr<SessionContext>);
     void attach(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>);
     void attach(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>);
-    void send(boost::shared_ptr<SenderContext> ctxt, const qpid::messaging::Message&
message, bool sync);
+    void send(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>
ctxt, const qpid::messaging::Message& message, bool sync);
     bool fetch(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext>
lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout);
     bool get(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext>
lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout);
     void acknowledge(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message*
message, bool cumulative);
@@ -136,6 +136,13 @@ class ConnectionContext : public qpid::s
     CodecSwitch codecSwitch;
 
     void wait();
+    void wait(boost::shared_ptr<SessionContext>);
+    void wait(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>);
+    void wait(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>);
+    void checkClosed(boost::shared_ptr<SessionContext>);
+    void checkClosed(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>);
+    void checkClosed(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>);
+    void checkClosed(boost::shared_ptr<SessionContext>, pn_link_t*);
     void wakeupDriver();
     void attach(pn_session_t*, pn_link_t*, int credit=0);
 

Modified: qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp?rev=1463175&r1=1463174&r2=1463175&view=diff
==============================================================================
--- qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp (original)
+++ qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp Mon Apr  1 16:08:11
2013
@@ -39,7 +39,7 @@ SenderHandle::SenderHandle(boost::shared
 
 void SenderHandle::send(const Message& message, bool sync)
 {
-    connection->send(sender, message, sync);
+    connection->send(session, sender, message, sync);
 }
 
 void SenderHandle::close()

Modified: qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp?rev=1463175&r1=1463174&r2=1463175&view=diff
==============================================================================
--- qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp (original)
+++ qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp Mon Apr  1 16:08:11
2013
@@ -156,5 +156,4 @@ bool SessionContext::settled()
     }
     return result;
 }
-
 }}} // namespace qpid::messaging::amqp



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message