Author: kgiusti
Date: Mon Sep 27 21:41:40 2010
New Revision: 1001917
URL: http://svn.apache.org/viewvc?rev=1001917&view=rev
Log:
QPID-2862: immediately cancel a pending getObjects or method call if the broker disconnects.
Modified:
qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py
Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py?rev=1001917&r1=1001916&r2=1001917&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py (original)
+++ qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py Mon Sep 27 21:41:40 2010
@@ -442,15 +442,15 @@ class Object(object):
if seq:
if not sync:
return seq
+ self._broker.cv.acquire()
try:
- self._broker.cv.acquire()
starttime = time()
while self._broker.syncInFlight and self._broker.error == None:
self._broker.cv.wait(timeout)
if time() - starttime > timeout:
- self._session.seqMgr._release(seq)
raise RuntimeError("Timed out waiting for method to respond")
finally:
+ self._session.seqMgr._release(seq)
self._broker.cv.release()
if self._broker.error != None:
errorText = self._broker.error
@@ -2497,6 +2497,29 @@ class Broker(Thread):
def _send(self, msg, dest="qpid.management"):
self.amqpSession.message_transfer(destination=dest, message=msg)
+ def _disconnect(self, err_info=None):
+ """ Called when the remote broker has disconnected. Re-initializes all
+ state associated with the broker.
+ """
+ # notify any waiters, and callback
+ self.cv.acquire()
+ try:
+ if err_info is not None:
+ self.error = err_info
+ _agents = self.agents
+ self.agents = {}
+ for agent in _agents.itervalues():
+ agent.close()
+ self.syncInFlight = False
+ self.reqsOutstanding = 0
+ self.cv.notifyAll()
+ finally:
+ self.cv.release()
+
+ if self.session.console:
+ for agent in _agents:
+ self.session.console.delAgent(agent)
+
def _shutdown(self, _timeout=10):
""" Disconnect from a broker, and release its resources. Errors are
ignored.
@@ -2506,6 +2529,10 @@ class Broker(Thread):
self.canceled = True
self.rcv_queue.put(Broker._q_item(Broker._q_item.type_wakeup, None))
self.join(_timeout)
+
+ # abort any pending transactions
+ self._disconnect("broker shutdown")
+
try:
if self.amqpSession:
self.amqpSession.close();
@@ -2729,7 +2756,7 @@ class Broker(Thread):
self.cv.acquire()
try:
self.connected = False
- self.error = data
+ self.error = "exception received from messaging layer: %s" % str(data)
finally:
self.cv.release()
self.rcv_queue.put(Broker._q_item(Broker._q_item.type_wakeup, None))
@@ -2789,15 +2816,8 @@ class Broker(Thread):
item = None
break
- # notify any waiters, and callback
- self.cv.acquire()
- try:
- edata = self.error;
- if self.syncInFlight:
- self.cv.notify()
- finally:
- self.cv.release()
- self.session._handleError(edata)
+ self._disconnect() # clean up any pending agents
+ self.session._handleError(self.error)
self.session._handleBrokerDisconnect(self)
if not self.session.manageConnections:
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org
|