qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kgiu...@apache.org
Subject svn commit: r1001917 - /qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py
Date Mon, 27 Sep 2010 21:41:40 GMT
Author: kgiusti
Date: Mon Sep 27 21:41:40 2010
New Revision: 1001917

URL: http://svn.apache.org/viewvc?rev=1001917&view=rev
Log:
QPID-2862: immediately cancel a pending getObjects or method call if the broker disconnects.

Modified:
    qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py

Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py?rev=1001917&r1=1001916&r2=1001917&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py (original)
+++ qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py Mon Sep 27 21:41:40 2010
@@ -442,15 +442,15 @@ class Object(object):
     if seq:
       if not sync:
         return seq
+      self._broker.cv.acquire()
       try:
-        self._broker.cv.acquire()
         starttime = time()
         while self._broker.syncInFlight and self._broker.error == None:
           self._broker.cv.wait(timeout)
           if time() - starttime > timeout:
-            self._session.seqMgr._release(seq)
             raise RuntimeError("Timed out waiting for method to respond")
       finally:
+        self._session.seqMgr._release(seq)
         self._broker.cv.release()
       if self._broker.error != None:
         errorText = self._broker.error
@@ -2497,6 +2497,29 @@ class Broker(Thread):
   def _send(self, msg, dest="qpid.management"):
     self.amqpSession.message_transfer(destination=dest, message=msg)
 
+  def _disconnect(self, err_info=None):
+    """ Called when the remote broker has disconnected. Re-initializes all
+    state associated with the broker.
+    """
+    # notify any waiters, and callback
+    self.cv.acquire()
+    try:
+      if err_info is not None:
+        self.error = err_info
+      _agents = self.agents
+      self.agents = {}
+      for agent in _agents.itervalues():
+        agent.close()
+      self.syncInFlight = False
+      self.reqsOutstanding = 0
+      self.cv.notifyAll()
+    finally:
+      self.cv.release()
+
+    if self.session.console:
+      for agent in _agents:
+        self.session.console.delAgent(agent)
+
   def _shutdown(self, _timeout=10):
     """ Disconnect from a broker, and release its resources.   Errors are
     ignored.
@@ -2506,6 +2529,10 @@ class Broker(Thread):
       self.canceled = True
       self.rcv_queue.put(Broker._q_item(Broker._q_item.type_wakeup, None))
       self.join(_timeout)
+
+    # abort any pending transactions
+    self._disconnect("broker shutdown")
+
     try:
       if self.amqpSession:
         self.amqpSession.close();
@@ -2729,7 +2756,7 @@ class Broker(Thread):
     self.cv.acquire()
     try:
       self.connected = False
-      self.error = data
+      self.error = "exception received from messaging layer: %s" % str(data)
     finally:
       self.cv.release()
     self.rcv_queue.put(Broker._q_item(Broker._q_item.type_wakeup, None))
@@ -2789,15 +2816,8 @@ class Broker(Thread):
               item = None
               break
 
-          # notify any waiters, and callback
-          self.cv.acquire()
-          try:
-            edata = self.error;
-            if self.syncInFlight:
-              self.cv.notify()
-          finally:
-            self.cv.release()
-          self.session._handleError(edata)
+          self._disconnect()  # clean up any pending agents
+          self.session._handleError(self.error)
           self.session._handleBrokerDisconnect(self)
 
           if not self.session.manageConnections:



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message