qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shus...@apache.org
Subject svn commit: r1186990 [10/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/ cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2/python/ cpp/bindings/qmf...
Date Thu, 20 Oct 2011 18:43:26 GMT
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/SemanticState.cpp?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/SemanticState.cpp Thu Oct 20 18:42:46 2011
@@ -70,14 +70,12 @@ SemanticState::SemanticState(DeliveryAda
       deliveryAdapter(da),
       tagGenerator("sgen"),
       dtxSelected(false),
-      authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isFederationLink()),
+      authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isUserProxyAuth()),
       userID(getSession().getConnection().getUserId()),
       userName(getSession().getConnection().getUserId().substr(0,getSession().getConnection().getUserId().find('@'))),
       isDefaultRealm(userID.find('@') != std::string::npos && getSession().getBroker().getOptions().realm == userID.substr(userID.find('@')+1,userID.size())),
       closeComplete(false)
-{
-    acl = getSession().getBroker().getAcl();
-}
+{}
 
 SemanticState::~SemanticState() {
     closed();
@@ -88,7 +86,7 @@ void SemanticState::closed() {
         //prevent requeued messages being redelivered to consumers
         for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
             disable(i->second);
-        }        
+        }
         if (dtxBuffer.get()) {
             dtxBuffer->fail();
         }
@@ -107,16 +105,24 @@ bool SemanticState::exists(const string&
     return consumers.find(consumerTag) != consumers.end();
 }
 
-void SemanticState::consume(const string& tag, 
+namespace {
+    const std::string SEPARATOR("::");
+}
+    
+void SemanticState::consume(const string& tag,
                             Queue::shared_ptr queue, bool ackRequired, bool acquire,
                             bool exclusive, const string& resumeId, uint64_t resumeTtl, const FieldTable& arguments)
 {
-    ConsumerImpl::shared_ptr c(new ConsumerImpl(this, tag, queue, ackRequired, acquire, exclusive, resumeId, resumeTtl, arguments));
+    // "tag" is only guaranteed to be unique to this session (see AMQP 0-10 Message.subscribe, destination).
+    // Create a globally unique name so the broker can identify individual consumers
+    std::string name = session.getSessionId().str() + SEPARATOR + tag;
+    ConsumerImpl::shared_ptr c(new ConsumerImpl(this, name, queue, ackRequired, acquire, exclusive, tag, resumeId, resumeTtl, arguments));
     queue->consume(c, exclusive);//may throw exception
     consumers[tag] = c;
 }
 
-void SemanticState::cancel(const string& tag){
+bool SemanticState::cancel(const string& tag)
+{
     ConsumerImplMap::iterator i = consumers.find(tag);
     if (i != consumers.end()) {
         cancel(i->second);
@@ -124,7 +130,13 @@ void SemanticState::cancel(const string&
         //should cancel all unacked messages for this consumer so that
         //they are not redelivered on recovery
         for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::cancel, _1, tag));
-        
+        //can also remove any records that are now redundant
+        DeliveryRecords::iterator removed =
+            remove_if(unacked.begin(), unacked.end(), bind(&DeliveryRecord::isRedundant, _1));
+        unacked.erase(removed, unacked.end());
+        return true;
+    } else {
+        return false;
     }
 }
 
@@ -167,8 +179,8 @@ void SemanticState::startDtx(const std::
     if (!dtxSelected) {
         throw CommandInvalidException(QPID_MSG("Session has not been selected for use with dtx"));
     }
-    dtxBuffer = DtxBuffer::shared_ptr(new DtxBuffer(xid));
-    txBuffer = boost::static_pointer_cast<TxBuffer>(dtxBuffer);
+    dtxBuffer.reset(new DtxBuffer(xid));
+    txBuffer = dtxBuffer;
     if (join) {
         mgr.join(xid, dtxBuffer);
     } else {
@@ -194,7 +206,7 @@ void SemanticState::endDtx(const std::st
         dtxBuffer->fail();
     } else {
         dtxBuffer->markEnded();
-    }    
+    }
     dtxBuffer.reset();
 }
 
@@ -236,7 +248,7 @@ void SemanticState::resumeDtx(const std:
 
     checkDtxTimeout();
     dtxBuffer->setSuspended(false);
-    txBuffer = boost::static_pointer_cast<TxBuffer>(dtxBuffer);
+    txBuffer = dtxBuffer;
 }
 
 void SemanticState::checkDtxTimeout()
@@ -254,31 +266,33 @@ void SemanticState::record(const Deliver
 
 const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency");
 
-SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, 
-                                          const string& _name, 
-                                          Queue::shared_ptr _queue, 
+SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
+                                          const string& _name,
+                                          Queue::shared_ptr _queue,
                                           bool ack,
                                           bool _acquire,
                                           bool _exclusive,
+                                          const string& _tag,
                                           const string& _resumeId,
                                           uint64_t _resumeTtl,
                                           const framing::FieldTable& _arguments
 
 
-) : 
-    Consumer(_acquire),
-    parent(_parent), 
-    name(_name), 
-    queue(_queue), 
-    ackExpected(ack), 
+) :
+    Consumer(_name, _acquire),
+    parent(_parent),
+    queue(_queue),
+    ackExpected(ack),
     acquire(_acquire),
-    blocked(true), 
+    blocked(true),
     windowing(true),
+    windowActive(false),
     exclusive(_exclusive),
     resumeId(_resumeId),
+    tag(_tag),
     resumeTtl(_resumeTtl),
     arguments(_arguments),
-    msgCredit(0), 
+    msgCredit(0),
     byteCredit(0),
     notifyEnabled(true),
     syncFrequency(_arguments.getAsInt(QPID_SYNC_FREQUENCY)),
@@ -289,10 +303,10 @@ SemanticState::ConsumerImpl::ConsumerImp
     {
         ManagementAgent* agent = parent->session.getBroker().getManagementAgent();
         qpid::management::Manageable* ms = dynamic_cast<qpid::management::Manageable*> (&(parent->session));
-        
+
         if (agent != 0)
         {
-            mgmtObject = new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId() ,name,
+            mgmtObject = new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId(), getTag(),
                                                 !acquire, ackExpected, exclusive, ManagementAgent::toMap(arguments));
             agent->addObject (mgmtObject);
             mgmtObject->set_creditMode("WINDOW");
@@ -324,16 +338,16 @@ bool SemanticState::ConsumerImpl::delive
 {
     assertClusterSafe();
     allocateCredit(msg.payload);
-    DeliveryRecord record(msg, queue, name, acquire, !ackExpected, windowing);
+    DeliveryRecord record(msg, queue, getTag(), acquire, !ackExpected, windowing);
     bool sync = syncFrequency && ++deliveryCount >= syncFrequency;
     if (sync) deliveryCount = 0;//reset
     parent->deliver(record, sync);
-    if (!ackExpected && acquire) record.setEnded();//allows message to be released now its been delivered
     if (windowing || ackExpected || !acquire) {
         parent->record(record);
-    } 
-    if (acquire && !ackExpected) {
-        queue->dequeue(0, msg);
+    }
+    if (acquire && !ackExpected) {  // auto acquire && auto accept
+        queue->dequeue(0 /*ctxt*/, msg);
+        record.setEnded();
     }
     if (mgmtObject) { mgmtObject->inc_delivered(); }
     return true;
@@ -351,7 +365,7 @@ bool SemanticState::ConsumerImpl::accept
     // checkCredit fails because the message is to big, we should
     // remain on queue's listener list for possible smaller messages
     // in future.
-    // 
+    //
     blocked = !(filter(msg) && checkCredit(msg));
     return !blocked;
 }
@@ -363,7 +377,7 @@ struct ConsumerName {
 };
 
 ostream& operator<<(ostream& o, const ConsumerName& pc) {
-    return o << pc.consumer.getName() << " on "
+    return o << pc.consumer.getTag() << " on "
              << pc.consumer.getParent().getSession().getSessionId();
 }
 }
@@ -372,7 +386,7 @@ void SemanticState::ConsumerImpl::alloca
 {
     assertClusterSafe();
     uint32_t originalMsgCredit = msgCredit;
-    uint32_t originalByteCredit = byteCredit;        
+    uint32_t originalByteCredit = byteCredit;
     if (msgCredit != 0xFFFFFFFF) {
         msgCredit--;
     }
@@ -382,7 +396,7 @@ void SemanticState::ConsumerImpl::alloca
     QPID_LOG(debug, "Credit allocated for " << ConsumerName(*this)
              << ", was " << " bytes: " << originalByteCredit << " msgs: " << originalMsgCredit
              << " now bytes: " << byteCredit << " msgs: " << msgCredit);
-    
+
 }
 
 bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg)
@@ -396,7 +410,7 @@ bool SemanticState::ConsumerImpl::checkC
     return enoughCredit;
 }
 
-SemanticState::ConsumerImpl::~ConsumerImpl() 
+SemanticState::ConsumerImpl::~ConsumerImpl()
 {
     if (mgmtObject != 0)
         mgmtObject->resourceDestroy ();
@@ -414,7 +428,7 @@ void SemanticState::unsubscribe(Consumer
     Queue::shared_ptr queue = c->getQueue();
     if(queue) {
         queue->cancel(c);
-        if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) {            
+        if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) {
             Queue::tryAutoDelete(session.getBroker(), queue);
         }
     }
@@ -456,23 +470,23 @@ const std::string nullstring;
 }
 
 void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
-    msg->setTimestamp(getSession().getBroker().getExpiryPolicy());
-    
+    msg->computeExpiration(getSession().getBroker().getExpiryPolicy());
+
     std::string exchangeName = msg->getExchangeName();
-    if (!cacheExchange || cacheExchange->getName() != exchangeName)
+    if (!cacheExchange || cacheExchange->getName() != exchangeName || cacheExchange->isDestroyed())
         cacheExchange = session.getBroker().getExchanges().get(exchangeName);
     cacheExchange->setProperties(msg);
 
     /* verify the userid if specified: */
     std::string id =
     	msg->hasProperties<MessageProperties>() ? msg->getProperties<MessageProperties>()->getUserId() : nullstring;
-    
     if (authMsg &&  !id.empty() && !(id == userID || (isDefaultRealm && id == userName)))
     {
         QPID_LOG(debug, "authorised user id : " << userID << " but user id in message declared as " << id);
         throw UnauthorizedAccessException(QPID_MSG("authorised user id : " << userID << " but user id in message declared as " << id));
     }
 
+    AclModule* acl = getSession().getBroker().getAcl();
     if (acl && acl->doTransferAcl())
     {
         if (!acl->authorise(getSession().getConnection().getUserId(),acl::ACT_PUBLISH,acl::OBJ_EXCHANGE,exchangeName, msg->getRoutingKey() ))
@@ -484,7 +498,7 @@ void SemanticState::route(intrusive_ptr<
 
     if (!strategy.delivered) {
         //TODO:if discard-unroutable, just drop it
-        //TODO:else if accept-mode is explicit, reject it 
+        //TODO:else if accept-mode is explicit, reject it
         //else route it to alternate exchange
         if (cacheExchange->getAlternate()) {
             cacheExchange->getAlternate()->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders());
@@ -513,7 +527,7 @@ void SemanticState::ConsumerImpl::reques
 }
 
 bool SemanticState::complete(DeliveryRecord& delivery)
-{    
+{
     ConsumerImplMap::iterator i = consumers.find(delivery.getTag());
     if (i != consumers.end()) {
         i->second->complete(delivery);
@@ -525,7 +539,7 @@ void SemanticState::ConsumerImpl::comple
 {
     if (!delivery.isComplete()) {
         delivery.complete();
-        if (windowing) {
+        if (windowing && windowActive) {
             if (msgCredit != 0xFFFFFFFF) msgCredit++;
             if (byteCredit != 0xFFFFFFFF) byteCredit += delivery.getCredit();
         }
@@ -541,7 +555,7 @@ void SemanticState::recover(bool requeue
         unacked.clear();
         for_each(copy.rbegin(), copy.rend(), mem_fun_ref(&DeliveryRecord::requeue));
     }else{
-        for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::redeliver, _1, this));        
+        for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::redeliver, _1, this));
         //unconfirmed messages re redelivered and therefore have their
         //id adjusted, confirmed messages are not and so the ordering
         //w.r.t id is lost
@@ -554,50 +568,61 @@ void SemanticState::deliver(DeliveryReco
     return deliveryAdapter.deliver(msg, sync);
 }
 
-SemanticState::ConsumerImpl& SemanticState::find(const std::string& destination)
+const SemanticState::ConsumerImpl::shared_ptr SemanticState::find(const std::string& destination) const
 {
-    ConsumerImplMap::iterator i = consumers.find(destination);
-    if (i == consumers.end()) {
-        throw NotFoundException(QPID_MSG("Unknown destination " << destination));
+    ConsumerImpl::shared_ptr consumer;
+    if (!find(destination, consumer)) {
+        throw NotFoundException(QPID_MSG("Unknown destination " << destination << " session=" << session.getSessionId()));
     } else {
-        return *(i->second);
+        return consumer;
+    }
+}
+
+bool SemanticState::find(const std::string& destination, ConsumerImpl::shared_ptr& consumer) const
+{
+    // @todo KAG gsim: shouldn't the consumers map be locked????
+    ConsumerImplMap::const_iterator i = consumers.find(destination);
+    if (i == consumers.end()) {
+        return false;
     }
+    consumer = i->second;
+    return true;
 }
 
 void SemanticState::setWindowMode(const std::string& destination)
 {
-    find(destination).setWindowMode();
+    find(destination)->setWindowMode();
 }
 
 void SemanticState::setCreditMode(const std::string& destination)
 {
-    find(destination).setCreditMode();
+    find(destination)->setCreditMode();
 }
 
 void SemanticState::addByteCredit(const std::string& destination, uint32_t value)
 {
-    ConsumerImpl& c = find(destination);
-    c.addByteCredit(value);
-    c.requestDispatch();
+    ConsumerImpl::shared_ptr c = find(destination);
+    c->addByteCredit(value);
+    c->requestDispatch();
 }
 
 
 void SemanticState::addMessageCredit(const std::string& destination, uint32_t value)
 {
-    ConsumerImpl& c = find(destination);
-    c.addMessageCredit(value);
-    c.requestDispatch();
+    ConsumerImpl::shared_ptr c = find(destination);
+    c->addMessageCredit(value);
+    c->requestDispatch();
 }
 
 void SemanticState::flush(const std::string& destination)
 {
-    find(destination).flush();
+    find(destination)->flush();
 }
 
 
 void SemanticState::stop(const std::string& destination)
 {
-    find(destination).stop();
+    find(destination)->stop();
 }
 
 void SemanticState::ConsumerImpl::setWindowMode()
@@ -621,6 +646,7 @@ void SemanticState::ConsumerImpl::setCre
 void SemanticState::ConsumerImpl::addByteCredit(uint32_t value)
 {
     assertClusterSafe();
+    if (windowing) windowActive = true;
     if (byteCredit != 0xFFFFFFFF) {
         if (value == 0xFFFFFFFF) byteCredit = value;
         else byteCredit += value;
@@ -630,6 +656,7 @@ void SemanticState::ConsumerImpl::addByt
 void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value)
 {
     assertClusterSafe();
+    if (windowing) windowActive = true;
     if (msgCredit != 0xFFFFFFFF) {
         if (value == 0xFFFFFFFF) msgCredit = value;
         else msgCredit += value;
@@ -650,7 +677,8 @@ void SemanticState::ConsumerImpl::flush(
 {
     while(haveCredit() && queue->dispatch(shared_from_this()))
         ;
-    stop();
+    msgCredit = 0;
+    byteCredit = 0;
 }
 
 void SemanticState::ConsumerImpl::stop()
@@ -658,6 +686,7 @@ void SemanticState::ConsumerImpl::stop()
     assertClusterSafe();
     msgCredit = 0;
     byteCredit = 0;
+    windowActive = false;
 }
 
 Queue::shared_ptr SemanticState::getQueue(const string& name) const {
@@ -673,7 +702,7 @@ Queue::shared_ptr SemanticState::getQueu
 }
 
 AckRange SemanticState::findRange(DeliveryId first, DeliveryId last)
-{   
+{
     return DeliveryRecord::findRange(unacked, first, last);
 }
 
@@ -691,14 +720,21 @@ void SemanticState::release(DeliveryId f
     DeliveryRecords::reverse_iterator start(range.end);
     DeliveryRecords::reverse_iterator end(range.start);
     for_each(start, end, boost::bind(&DeliveryRecord::release, _1, setRedelivered));
+
+    DeliveryRecords::iterator removed =
+        remove_if(range.start, range.end, bind(&DeliveryRecord::isRedundant, _1));
+    unacked.erase(removed, range.end);
 }
 
 void SemanticState::reject(DeliveryId first, DeliveryId last)
 {
     AckRange range = findRange(first, last);
     for_each(range.start, range.end, mem_fun_ref(&DeliveryRecord::reject));
-    //need to remove the delivery records as well
-    unacked.erase(range.start, range.end);
+    //may need to remove the delivery records as well
+    for (DeliveryRecords::iterator i = range.start; i != unacked.end() && i->getId() <= last; ) {
+        if (i->isRedundant()) i = unacked.erase(i);
+        else i++;
+    }
 }
 
 bool SemanticState::ConsumerImpl::doOutput()
@@ -761,13 +797,13 @@ void SemanticState::accepted(const Seque
         //in transactional mode, don't dequeue or remove, just
         //maintain set of acknowledged messages:
         accumulatedAck.add(commands);
-        
+
         if (dtxBuffer.get()) {
             //if enlisted in a dtx, copy the relevant slice from
             //unacked and record it against that transaction
             TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked));
             accumulatedAck.clear();
-            dtxBuffer->enlist(txAck);    
+            dtxBuffer->enlist(txAck);
 
             //mark the relevant messages as 'ended' in unacked
             //if the messages are already completed, they can be
@@ -789,7 +825,6 @@ void SemanticState::accepted(const Seque
 }
 
 void SemanticState::completed(const SequenceSet& commands) {
-    assertClusterSafe();
     DeliveryRecords::iterator removed =
         remove_if(unacked.begin(), unacked.end(),
                   isInSequenceSetAnd(commands,
@@ -800,7 +835,6 @@ void SemanticState::completed(const Sequ
 
 void SemanticState::attached()
 {
-    assertClusterSafe();
     for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
         i->second->enableNotify();
         session.getConnection().outputTasks.addOutputTask(i->second.get());
@@ -810,7 +844,6 @@ void SemanticState::attached()
 
 void SemanticState::detached()
 {
-    assertClusterSafe();
     for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
         i->second->disableNotify();
         session.getConnection().outputTasks.removeOutputTask(i->second.get());

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/SemanticState.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/SemanticState.h?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/SemanticState.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/SemanticState.h Thu Oct 20 18:42:46 2011
@@ -65,7 +65,7 @@ class SessionContext;
  *
  * Message delivery is driven by ConsumerImpl::doOutput(), which is
  * called when a client's socket is ready to write data.
- * 
+ *
  */
 class SemanticState : private boost::noncopyable {
   public:
@@ -75,14 +75,15 @@ class SemanticState : private boost::non
     {
         mutable qpid::sys::Mutex lock;
         SemanticState* const parent;
-        const std::string name;
         const boost::shared_ptr<Queue> queue;
         const bool ackExpected;
         const bool acquire;
         bool blocked;
         bool windowing;
+        bool windowActive;
         bool exclusive;
         std::string resumeId;
+        const std::string tag;  // <destination> from AMQP 0-10 Message.subscribe command
         uint64_t resumeTtl;
         framing::FieldTable arguments;
         uint32_t msgCredit;
@@ -99,15 +100,16 @@ class SemanticState : private boost::non
       public:
         typedef boost::shared_ptr<ConsumerImpl> shared_ptr;
 
-        ConsumerImpl(SemanticState* parent, 
+        ConsumerImpl(SemanticState* parent,
                      const std::string& name, boost::shared_ptr<Queue> queue,
                      bool ack, bool acquire, bool exclusive,
-                     const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments);
+                     const std::string& tag, const std::string& resumeId,
+                     uint64_t resumeTtl, const framing::FieldTable& arguments);
         ~ConsumerImpl();
         OwnershipToken* getSession();
-        bool deliver(QueuedMessage& msg);            
-        bool filter(boost::intrusive_ptr<Message> msg);            
-        bool accept(boost::intrusive_ptr<Message> msg);            
+        bool deliver(QueuedMessage& msg);
+        bool filter(boost::intrusive_ptr<Message> msg);
+        bool accept(boost::intrusive_ptr<Message> msg);
 
         void disableNotify();
         void enableNotify();
@@ -122,15 +124,13 @@ class SemanticState : private boost::non
         void addMessageCredit(uint32_t value);
         void flush();
         void stop();
-        void complete(DeliveryRecord&);    
+        void complete(DeliveryRecord&);
         boost::shared_ptr<Queue> getQueue() const { return queue; }
         bool isBlocked() const { return blocked; }
         bool setBlocked(bool set) { std::swap(set, blocked); return set; }
 
         bool doOutput();
 
-        std::string getName() const { return name; }
-
         bool isAckExpected() const { return ackExpected; }
         bool isAcquire() const { return acquire; }
         bool isWindowing() const { return windowing; }
@@ -138,6 +138,7 @@ class SemanticState : private boost::non
         uint32_t getMsgCredit() const { return msgCredit; }
         uint32_t getByteCredit() const { return byteCredit; }
         std::string getResumeId() const { return resumeId; };
+        const std::string& getTag() const { return tag; }
         uint64_t getResumeTtl() const { return resumeTtl; }
         const framing::FieldTable& getArguments() const { return arguments; }
 
@@ -148,9 +149,10 @@ class SemanticState : private boost::non
         management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
     };
 
+    typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap;
+
   private:
     typedef std::map<std::string, ConsumerImpl::shared_ptr> ConsumerImplMap;
-    typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap;
 
     SessionContext& session;
     DeliveryAdapter& deliveryAdapter;
@@ -163,7 +165,6 @@ class SemanticState : private boost::non
     DtxBufferMap suspendedXids;
     framing::SequenceSet accumulatedAck;
     boost::shared_ptr<Exchange> cacheExchange;
-    AclModule* acl;
     const bool authMsg;
     const std::string userID;
     const std::string userName;
@@ -181,14 +182,16 @@ class SemanticState : private boost::non
     void disable(ConsumerImpl::shared_ptr);
 
   public:
+
     SemanticState(DeliveryAdapter&, SessionContext&);
     ~SemanticState();
 
     SessionContext& getSession() { return session; }
     const SessionContext& getSession() const { return session; }
 
-    ConsumerImpl& find(const std::string& destination);
-    
+    const ConsumerImpl::shared_ptr find(const std::string& destination) const;
+    bool find(const std::string& destination, ConsumerImpl::shared_ptr&) const;
+
     /**
      * Get named queue, never returns 0.
      * @return: named queue
@@ -196,16 +199,16 @@ class SemanticState : private boost::non
      * @exception: ConnectionException if name="" and session has no default.
      */
     boost::shared_ptr<Queue> getQueue(const std::string& name) const;
-    
+
     bool exists(const std::string& consumerTag);
 
-    void consume(const std::string& destination, 
-                 boost::shared_ptr<Queue> queue, 
+    void consume(const std::string& destination,
+                 boost::shared_ptr<Queue> queue,
                  bool ackRequired, bool acquire, bool exclusive,
                  const std::string& resumeId=std::string(), uint64_t resumeTtl=0,
                  const framing::FieldTable& = framing::FieldTable());
 
-    void cancel(const std::string& tag);
+    bool cancel(const std::string& tag);
 
     void setWindowMode(const std::string& destination);
     void setCreditMode(const std::string& destination);
@@ -218,12 +221,13 @@ class SemanticState : private boost::non
     void commit(MessageStore* const store);
     void rollback();
     void selectDtx();
+    bool getDtxSelected() const { return dtxSelected; }
     void startDtx(const std::string& xid, DtxManager& mgr, bool join);
     void endDtx(const std::string& xid, bool fail);
     void suspendDtx(const std::string& xid);
     void resumeDtx(const std::string& xid);
     void recover(bool requeue);
-    void deliver(DeliveryRecord& message, bool sync);            
+    void deliver(DeliveryRecord& message, bool sync);
     void acquire(DeliveryId first, DeliveryId last, DeliveryIds& acquired);
     void release(DeliveryId first, DeliveryId last, bool setRedelivered);
     void reject(DeliveryId first, DeliveryId last);
@@ -244,9 +248,12 @@ class SemanticState : private boost::non
     DeliveryRecords& getUnacked() { return unacked; }
     framing::SequenceSet getAccumulatedAck() const { return accumulatedAck; }
     TxBuffer::shared_ptr getTxBuffer() const { return txBuffer; }
+    DtxBuffer::shared_ptr getDtxBuffer() const { return dtxBuffer; }
     void setTxBuffer(const TxBuffer::shared_ptr& txb) { txBuffer = txb; }
+    void setDtxBuffer(const DtxBuffer::shared_ptr& dtxb) { dtxBuffer = dtxb; txBuffer = dtxb; }
     void setAccumulatedAck(const framing::SequenceSet& s) { accumulatedAck = s; }
     void record(const DeliveryRecord& delivery);
+    DtxBufferMap& getSuspendedXids() { return suspendedXids; }
 };
 
 }} // namespace qpid::broker

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionAdapter.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionAdapter.cpp?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionAdapter.cpp Thu Oct 20 18:42:46 2011
@@ -24,6 +24,7 @@
 #include "qpid/log/Statement.h"
 #include "qpid/framing/SequenceSet.h"
 #include "qpid/management/ManagementAgent.h"
+#include "qpid/broker/SessionState.h"
 #include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
 #include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
 #include "qmf/org/apache/qpid/broker/EventQueueDeclare.h"
@@ -64,53 +65,56 @@ void SessionAdapter::ExchangeHandlerImpl
                                                   const string& alternateExchange, 
                                                   bool passive, bool durable, bool /*autoDelete*/, const FieldTable& args){
 
-    AclModule* acl = getBroker().getAcl();
-    if (acl) {
-        std::map<acl::Property, std::string> params;
-        params.insert(make_pair(acl::PROP_TYPE, type));
-        params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
-        params.insert(make_pair(acl::PROP_PASSIVE, std::string(passive ? _TRUE : _FALSE) ));
-        params.insert(make_pair(acl::PROP_DURABLE, std::string(durable ? _TRUE : _FALSE)));
-        if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_EXCHANGE,exchange,&params) )
-            throw UnauthorizedAccessException(QPID_MSG("ACL denied exchange declare request from " << getConnection().getUserId()));
-    }
-    
     //TODO: implement autoDelete
     Exchange::shared_ptr alternate;
     if (!alternateExchange.empty()) {
         alternate = getBroker().getExchanges().get(alternateExchange);
     }
     if(passive){
+        AclModule* acl = getBroker().getAcl();
+        if (acl) {
+            //TODO: why does a passive declare require create
+            //permission? The purpose of the passive flag is to state
+            //that the exchange should *not* created. For
+            //authorisation a passive declare is similar to
+            //exchange-query.
+            std::map<acl::Property, std::string> params;
+            params.insert(make_pair(acl::PROP_TYPE, type));
+            params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
+            params.insert(make_pair(acl::PROP_PASSIVE, _TRUE));
+            params.insert(make_pair(acl::PROP_DURABLE, durable ? _TRUE : _FALSE));
+            if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_EXCHANGE,exchange,&params) )
+                throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange create request from " << getConnection().getUserId()));
+        }
         Exchange::shared_ptr actual(getBroker().getExchanges().get(exchange));
         checkType(actual, type);
         checkAlternate(actual, alternate);
-    }else{        
+    }else{
         if(exchange.find("amq.") == 0 || exchange.find("qpid.") == 0) {
             throw framing::NotAllowedException(QPID_MSG("Exchange names beginning with \"amq.\" or \"qpid.\" are reserved. (exchange=\"" << exchange << "\")"));
         }
         try{
-            std::pair<Exchange::shared_ptr, bool> response = getBroker().getExchanges().declare(exchange, type, durable, args);
-            if (response.second) {
-                if (alternate) {
-                    response.first->setAlternate(alternate);
-                    alternate->incAlternateUsers();
-                }
-                if (durable) {
-                    getBroker().getStore().create(*response.first, args);
-                }
-            } else {
+            std::pair<Exchange::shared_ptr, bool> response =
+                getBroker().createExchange(exchange, type, durable, alternateExchange, args,
+                                           getConnection().getUserId(), getConnection().getUrl());
+            if (!response.second) {
+                //exchange already there, not created
                 checkType(response.first, type);
                 checkAlternate(response.first, alternate);
+                ManagementAgent* agent = getBroker().getManagementAgent();
+                if (agent)
+                    agent->raiseEvent(_qmf::EventExchangeDeclare(getConnection().getUrl(),
+                                                                 getConnection().getUserId(),
+                                                                 exchange,
+                                                                 type,
+                                                                 alternateExchange,
+                                                                 durable,
+                                                                 false,
+                                                                 ManagementAgent::toMap(args),
+                                                                 "existing"));
             }
-
-            ManagementAgent* agent = getBroker().getManagementAgent();
-            if (agent)
-                agent->raiseEvent(_qmf::EventExchangeDeclare(getConnection().getUrl(), getConnection().getUserId(), exchange, type,
-                                                             alternateExchange, durable, false, ManagementAgent::toMap(args),
-                                                             response.second ? "created" : "existing"));
-
         }catch(UnknownExchangeTypeException& /*e*/){
-            throw CommandInvalidException(QPID_MSG("Exchange type not implemented: " << type));
+            throw NotFoundException(QPID_MSG("Exchange type not implemented: " << type));
         }
     }
 }
@@ -134,22 +138,8 @@ void SessionAdapter::ExchangeHandlerImpl
                 
 void SessionAdapter::ExchangeHandlerImpl::delete_(const string& name, bool /*ifUnused*/)
 {
-    AclModule* acl = getBroker().getAcl();
-    if (acl) {
-        if (!acl->authorise(getConnection().getUserId(),acl::ACT_DELETE,acl::OBJ_EXCHANGE,name,NULL) )
-            throw UnauthorizedAccessException(QPID_MSG("ACL denied exchange delete request from " << getConnection().getUserId()));
-    }
-
-    //TODO: implement unused
-    Exchange::shared_ptr exchange(getBroker().getExchanges().get(name));
-    if (exchange->inUseAsAlternate()) throw NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange."));
-    if (exchange->isDurable()) getBroker().getStore().destroy(*exchange);
-    if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers();
-    getBroker().getExchanges().destroy(name);
-
-    ManagementAgent* agent = getBroker().getManagementAgent();
-    if (agent)
-        agent->raiseEvent(_qmf::EventExchangeDelete(getConnection().getUrl(), getConnection().getUserId(), name));
+    //TODO: implement if-unused
+    getBroker().deleteExchange(name, getConnection().getUserId(), getConnection().getUrl());
 }
 
 ExchangeQueryResult SessionAdapter::ExchangeHandlerImpl::query(const string& name)
@@ -169,67 +159,19 @@ ExchangeQueryResult SessionAdapter::Exch
 }
 
 void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName, 
-                                           const string& exchangeName, const string& routingKey, 
-                                           const FieldTable& arguments)
+                                               const string& exchangeName, const string& routingKey, 
+                                               const FieldTable& arguments)
 {
-    AclModule* acl = getBroker().getAcl();
-    if (acl) {
-        std::map<acl::Property, std::string> params;
-        params.insert(make_pair(acl::PROP_QUEUENAME, queueName));
-        params.insert(make_pair(acl::PROP_ROUTINGKEY, routingKey));
-
-        if (!acl->authorise(getConnection().getUserId(),acl::ACT_BIND,acl::OBJ_EXCHANGE,exchangeName,&params))
-            throw UnauthorizedAccessException(QPID_MSG("ACL denied exchange bind request from " << getConnection().getUserId()));
-    }
-
-    Queue::shared_ptr queue = getQueue(queueName);
-    Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName);
-    if(exchange){
-        string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey;
-        if (exchange->bind(queue, exchangeRoutingKey, &arguments)) {
-            queue->bound(exchangeName, routingKey, arguments);
-            if (exchange->isDurable() && queue->isDurable()) {
-                getBroker().getStore().bind(*exchange, *queue, routingKey, arguments);
-            }
-
-            ManagementAgent* agent = getBroker().getManagementAgent();
-            if (agent)
-                agent->raiseEvent(_qmf::EventBind(getConnection().getUrl(), getConnection().getUserId(), exchangeName,
-                                                  queueName, exchangeRoutingKey, ManagementAgent::toMap(arguments)));
-        }
-    }else{
-        throw NotFoundException("Bind failed. No such exchange: " + exchangeName);
-    }
+    getBroker().bind(queueName, exchangeName, routingKey, arguments,
+                     getConnection().getUserId(), getConnection().getUrl());
 }
  
 void SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName,
                                                  const string& exchangeName,
                                                  const string& routingKey)
 {
-    AclModule* acl = getBroker().getAcl();
-    if (acl) {
-        std::map<acl::Property, std::string> params;
-        params.insert(make_pair(acl::PROP_QUEUENAME, queueName));
-        params.insert(make_pair(acl::PROP_ROUTINGKEY, routingKey));
-        if (!acl->authorise(getConnection().getUserId(),acl::ACT_UNBIND,acl::OBJ_EXCHANGE,exchangeName,&params) )
-            throw UnauthorizedAccessException(QPID_MSG("ACL denied exchange unbind request from " << getConnection().getUserId()));
-    }
-
-    Queue::shared_ptr queue = getQueue(queueName);
-    if (!queue.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName);
-
-    Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName);
-    if (!exchange.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName);
-
-    //TODO: revise unbind to rely solely on binding key (not args)
-    if (exchange->unbind(queue, routingKey, 0)) {
-        if (exchange->isDurable() && queue->isDurable())
-            getBroker().getStore().unbind(*exchange, *queue, routingKey, FieldTable());
-
-        ManagementAgent* agent = getBroker().getManagementAgent();
-        if (agent)
-            agent->raiseEvent(_qmf::EventUnbind(getConnection().getUrl(), getConnection().getUserId(), exchangeName, queueName, routingKey));
-    }
+    getBroker().unbind(queueName, exchangeName, routingKey,
+                       getConnection().getUserId(), getConnection().getUrl());
 }
 
 ExchangeBoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::string& exchangeName,
@@ -332,52 +274,42 @@ QueueQueryResult SessionAdapter::QueueHa
 void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& alternateExchange,
                                                bool passive, bool durable, bool exclusive, 
                                                bool autoDelete, const qpid::framing::FieldTable& arguments)
-{ 
-    AclModule* acl = getBroker().getAcl();
-    if (acl) {
-        std::map<acl::Property, std::string> params;
-        params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
-        params.insert(make_pair(acl::PROP_PASSIVE, std::string(passive ? _TRUE : _FALSE) ));
-        params.insert(make_pair(acl::PROP_DURABLE, std::string(durable ? _TRUE : _FALSE)));
-        params.insert(make_pair(acl::PROP_EXCLUSIVE, std::string(exclusive ? _TRUE : _FALSE)));
-        params.insert(make_pair(acl::PROP_AUTODELETE, std::string(autoDelete ? _TRUE : _FALSE)));
-        params.insert(make_pair(acl::PROP_POLICYTYPE, arguments.getAsString("qpid.policy_type")));
-        params.insert(make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast<string>(arguments.getAsInt("qpid.max_count"))));
-        params.insert(make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast<string>(arguments.getAsInt64("qpid.max_size"))));
-
-        if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_QUEUE,name,&params) )
-            throw UnauthorizedAccessException(QPID_MSG("ACL denied queue create request from " << getConnection().getUserId()));
-    }
-
-    Exchange::shared_ptr alternate;
-    if (!alternateExchange.empty()) {
-        alternate = getBroker().getExchanges().get(alternateExchange);
-    }
+{
     Queue::shared_ptr queue;
     if (passive && !name.empty()) {
-    queue = getQueue(name);
+        AclModule* acl = getBroker().getAcl();
+        if (acl) {
+            //TODO: why does a passive declare require create
+            //permission? The purpose of the passive flag is to state
+            //that the queue should *not* created. For
+            //authorisation a passive declare is similar to
+            //queue-query (or indeed a qmf query).
+            std::map<acl::Property, std::string> params;
+            params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
+            params.insert(make_pair(acl::PROP_PASSIVE, _TRUE));
+            params.insert(make_pair(acl::PROP_DURABLE, std::string(durable ? _TRUE : _FALSE)));
+            params.insert(make_pair(acl::PROP_EXCLUSIVE, std::string(exclusive ? _TRUE : _FALSE)));
+            params.insert(make_pair(acl::PROP_AUTODELETE, std::string(autoDelete ? _TRUE : _FALSE)));
+            params.insert(make_pair(acl::PROP_POLICYTYPE, arguments.getAsString("qpid.policy_type")));
+            params.insert(make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast<string>(arguments.getAsInt("qpid.max_count"))));
+            params.insert(make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast<string>(arguments.getAsInt64("qpid.max_size"))));
+            if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_QUEUE,name,&params) )
+                throw UnauthorizedAccessException(QPID_MSG("ACL denied queue create request from " << getConnection().getUserId()));
+        }
+        queue = getQueue(name);
         //TODO: check alternate-exchange is as expected
     } else {
-        std::pair<Queue::shared_ptr, bool> queue_created =  
-            getBroker().getQueues().declare(name, durable,
-                                            autoDelete,
-                                            exclusive ? &session : 0);
+        std::pair<Queue::shared_ptr, bool> queue_created =
+            getBroker().createQueue(name, durable,
+                                    autoDelete,
+                                    exclusive ? &session : 0,
+                                    alternateExchange,
+                                    arguments,
+                                    getConnection().getUserId(),
+                                    getConnection().getUrl());
         queue = queue_created.first;
         assert(queue);
         if (queue_created.second) { // This is a new queue
-            if (alternate) {
-                queue->setAlternateExchange(alternate);
-                alternate->incAlternateUsers();
-            }
-
-            //apply settings & create persistent record if required
-            try { queue_created.first->create(arguments); }
-            catch (...) { getBroker().getQueues().destroy(name); throw; }
-
-            //add default binding:
-            getBroker().getExchanges().getDefault()->bind(queue, name, 0);
-            queue->bound(getBroker().getExchanges().getDefault()->getName(), name, arguments);
-
             //handle automatic cleanup:
             if (exclusive) {
                 exclusiveQueues.push_back(queue);
@@ -386,21 +318,20 @@ void SessionAdapter::QueueHandlerImpl::d
             if (exclusive && queue->setExclusiveOwner(&session)) {
                 exclusiveQueues.push_back(queue);
             }
+            ManagementAgent* agent = getBroker().getManagementAgent();
+            if (agent)
+                agent->raiseEvent(_qmf::EventQueueDeclare(getConnection().getUrl(), getConnection().getUserId(),
+                                                      name, durable, exclusive, autoDelete, ManagementAgent::toMap(arguments),
+                                                      "existing"));
         }
 
-        ManagementAgent* agent = getBroker().getManagementAgent();
-        if (agent)
-            agent->raiseEvent(_qmf::EventQueueDeclare(getConnection().getUrl(), getConnection().getUserId(),
-                                                      name, durable, exclusive, autoDelete, ManagementAgent::toMap(arguments),
-                                                      queue_created.second ? "created" : "existing"));
     }
 
-    if (exclusive && !queue->isExclusiveOwner(&session)) 
+    if (exclusive && !queue->isExclusiveOwner(&session))
         throw ResourceLockedException(QPID_MSG("Cannot grant exclusive access to queue "
                                                << queue->getName()));
-} 
-        
-        
+}
+
 void SessionAdapter::QueueHandlerImpl::purge(const string& queue){
     AclModule* acl = getBroker().getAcl();
     if (acl)
@@ -409,40 +340,32 @@ void SessionAdapter::QueueHandlerImpl::p
              throw UnauthorizedAccessException(QPID_MSG("ACL denied queue purge request from " << getConnection().getUserId()));
     }
     getQueue(queue)->purge();
-} 
-        
-void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnused, bool ifEmpty){
-
-    AclModule* acl = getBroker().getAcl();
-    if (acl)
-    {
-         if (!acl->authorise(getConnection().getUserId(),acl::ACT_DELETE,acl::OBJ_QUEUE,queue,NULL) )
-             throw UnauthorizedAccessException(QPID_MSG("ACL denied queue delete request from " << getConnection().getUserId()));
-    }
+}
 
-    Queue::shared_ptr q = getQueue(queue);
-    if (q->hasExclusiveOwner() && !q->isExclusiveOwner(&session)) 
+void SessionAdapter::QueueHandlerImpl::checkDelete(Queue::shared_ptr queue, bool ifUnused, bool ifEmpty)
+{
+    if (queue->hasExclusiveOwner() && !queue->isExclusiveOwner(&session)) {
         throw ResourceLockedException(QPID_MSG("Cannot delete queue "
-                                               << queue << "; it is exclusive to another session"));
-    if(ifEmpty && q->getMessageCount() > 0){
-        throw PreconditionFailedException("Queue not empty.");
-    }else if(ifUnused && q->getConsumerCount() > 0){
-        throw PreconditionFailedException("Queue in use.");
-    }else{
+                                               << queue->getName() << "; it is exclusive to another session"));
+    } else if(ifEmpty && queue->getMessageCount() > 0) {
+        throw PreconditionFailedException(QPID_MSG("Cannot delete queue "
+                                                   << queue->getName() << "; queue not empty"));
+    } else if(ifUnused && queue->getConsumerCount() > 0) {
+        throw PreconditionFailedException(QPID_MSG("Cannot delete queue "
+                                                   << queue->getName() << "; queue in use"));
+    } else if (queue->isExclusiveOwner(&session)) {
         //remove the queue from the list of exclusive queues if necessary
-        if(q->isExclusiveOwner(&getConnection())){
-            QueueVector::iterator i = std::find(getConnection().exclusiveQueues.begin(), getConnection().exclusiveQueues.end(), q);
-            if(i < getConnection().exclusiveQueues.end()) getConnection().exclusiveQueues.erase(i);
-        }
-        q->destroy();
-        getBroker().getQueues().destroy(queue);
-        q->unbind(getBroker().getExchanges(), q);
-
-        ManagementAgent* agent = getBroker().getManagementAgent();
-        if (agent)
-            agent->raiseEvent(_qmf::EventQueueDelete(getConnection().getUrl(), getConnection().getUserId(), queue));
-        q->notifyDeleted();
-    }
+        QueueVector::iterator i = std::find(exclusiveQueues.begin(),
+                                            exclusiveQueues.end(),
+                                            queue);
+        if (i < exclusiveQueues.end()) exclusiveQueues.erase(i);
+    }    
+}
+        
+void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnused, bool ifEmpty)
+{
+    getBroker().deleteQueue(queue, getConnection().getUserId(), getConnection().getUrl(),
+                            boost::bind(&SessionAdapter::QueueHandlerImpl::checkDelete, this, _1, ifUnused, ifEmpty));
 } 
 
 SessionAdapter::MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) : 
@@ -508,7 +431,9 @@ SessionAdapter::MessageHandlerImpl::subs
 void
 SessionAdapter::MessageHandlerImpl::cancel(const string& destination )
 {
-    state.cancel(destination);
+    if (!state.cancel(destination)) {
+        throw NotFoundException(QPID_MSG("No such subscription: " << destination));
+    }
 
     ManagementAgent* agent = getBroker().getManagementAgent();
     if (agent)
@@ -587,7 +512,12 @@ framing::MessageResumeResult SessionAdap
     
 
 
-void SessionAdapter::ExecutionHandlerImpl::sync() {} //essentially a no-op
+void SessionAdapter::ExecutionHandlerImpl::sync()
+{
+    session.addPendingExecutionSync();
+    /** @todo KAG - need a generic mechanism to allow a command to returning "not completed" status back to SessionState */
+
+}
 
 void SessionAdapter::ExecutionHandlerImpl::result(const SequenceNumber& /*commandId*/, const string& /*value*/)
 {

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionAdapter.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionAdapter.h?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionAdapter.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionAdapter.h Thu Oct 20 18:42:46 2011
@@ -138,6 +138,7 @@ class Queue;
         bool isLocal(const ConnectionToken* t) const; 
 
         void destroyExclusiveQueues();
+        void checkDelete(boost::shared_ptr<Queue> queue, bool ifUnused, bool ifEmpty);
         template <class F> void eachExclusiveQueue(F f) 
         { 
             std::for_each(exclusiveQueues.begin(), exclusiveQueues.end(), f);

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionContext.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionContext.h?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionContext.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionContext.h Thu Oct 20 18:42:46 2011
@@ -46,6 +46,7 @@ class SessionContext : public OwnershipT
     virtual Broker& getBroker() = 0;
     virtual uint16_t getChannel() const = 0;
     virtual const SessionId& getSessionId() const = 0;
+    virtual void addPendingExecutionSync() = 0;
 };
 
 }} // namespace qpid::broker

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionHandler.cpp?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionHandler.cpp Thu Oct 20 18:42:46 2011
@@ -40,11 +40,6 @@ SessionHandler::SessionHandler(Connectio
 
 SessionHandler::~SessionHandler() {}
 
-namespace {
-ClassId classId(AMQMethodBody* m) { return m ? m->amqpMethodId() : 0; }
-MethodId methodId(AMQMethodBody* m) { return m ? m->amqpClassId() : 0; }
-} // namespace
-
 void SessionHandler::connectionException(framing::connection::CloseCode code, const std::string& msg) {
     // NOTE: must tell the error listener _before_ calling connection.close()
     if (connection.getErrorListener()) connection.getErrorListener()->connectionError(msg);

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionState.cpp?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionState.cpp Thu Oct 20 18:42:46 2011
@@ -25,6 +25,7 @@
 #include "qpid/broker/SessionManager.h"
 #include "qpid/broker/SessionHandler.h"
 #include "qpid/broker/RateFlowcontrol.h"
+#include "qpid/sys/ClusterSafe.h"
 #include "qpid/sys/Timer.h"
 #include "qpid/framing/AMQContentBody.h"
 #include "qpid/framing/AMQHeaderBody.h"
@@ -60,9 +61,9 @@ SessionState::SessionState(
       semanticState(*this, *this),
       adapter(semanticState),
       msgBuilder(&broker.getStore()),
-      enqueuedOp(boost::bind(&SessionState::enqueued, this, _1)),
       mgmtObject(0),
-      rateFlowcontrol(0)
+      rateFlowcontrol(0),
+      asyncCommandCompleter(new AsyncCommandCompleter(this))
 {
     uint32_t maxRate = broker.getOptions().maxSessionRate;
     if (maxRate) {
@@ -95,6 +96,7 @@ void SessionState::addManagementObject()
 }
 
 SessionState::~SessionState() {
+    asyncCommandCompleter->cancel();
     semanticState.closed();
     if (mgmtObject != 0)
         mgmtObject->resourceDestroy ();
@@ -125,6 +127,7 @@ bool SessionState::isLocal(const Connect
 
 void SessionState::detach() {
     QPID_LOG(debug, getId() << ": detached on broker.");
+    asyncCommandCompleter->detached();
     disableOutput();
     handler = 0;
     if (mgmtObject != 0)
@@ -145,6 +148,7 @@ void SessionState::attach(SessionHandler
         mgmtObject->set_connectionRef (h.getConnection().GetManagementObject()->getObjectId());
         mgmtObject->set_channelId (h.getChannel());
     }
+    asyncCommandCompleter->attached();
 }
 
 void SessionState::abort() {
@@ -202,15 +206,17 @@ Manageable::status_t SessionState::Manag
 }
 
 void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceNumber& id) {
+    currentCommandComplete = true;      // assumed, can be overridden by invoker method (this sucks).
     Invoker::Result invocation = invoke(adapter, *method);
-    receiverCompleted(id);
+    if (currentCommandComplete) receiverCompleted(id);
+
     if (!invocation.wasHandled()) {
         throw NotImplementedException(QPID_MSG("Not implemented: " << *method));
     } else if (invocation.hasResult()) {
         getProxy().getExecution().result(id, invocation.getResult());
     }
-    if (method->isSync()) {
-        incomplete.process(enqueuedOp, true);
+
+    if (method->isSync() && currentCommandComplete) {
         sendAcceptAndCompletion();
     }
 }
@@ -253,23 +259,14 @@ void SessionState::handleContent(AMQFram
             header.setEof(false);
             msg->getFrames().append(header);
         }
+        if (broker.isTimestamping())
+            msg->setTimestamp();
         msg->setPublisher(&getConnection());
+        msg->getIngressCompletion().begin();
         semanticState.handle(msg);
         msgBuilder.end();
-
-        if (msg->isEnqueueComplete()) {
-            enqueued(msg);
-        } else {
-            incomplete.add(msg);
-        }
-
-        //hold up execution until async enqueue is complete
-        if (msg->getFrames().getMethod()->isSync()) {
-            incomplete.process(enqueuedOp, true);
-            sendAcceptAndCompletion();
-        } else {
-            incomplete.process(enqueuedOp, false);
-        }
+        IncompleteIngressMsgXfer xfer(this, msg);
+        msg->getIngressCompletion().end(xfer);  // allows msg to complete xfer
     }
 
     // Handle producer session flow control
@@ -319,11 +316,41 @@ void SessionState::sendAcceptAndCompleti
     sendCompletion();
 }
 
-void SessionState::enqueued(boost::intrusive_ptr<Message> msg)
+/** Invoked when the given inbound message is finished being processed
+ * by all interested parties (eg. it is done being enqueued to all queues,
+ * its credit has been accounted for, etc).  At this point, msg is considered
+ * by this receiver as 'completed' (as defined by AMQP 0_10)
+ */
+void SessionState::completeRcvMsg(SequenceNumber id,
+                                  bool requiresAccept,
+                                  bool requiresSync)
 {
-    receiverCompleted(msg->getCommandId());
-    if (msg->requiresAccept())
-        accepted.add(msg->getCommandId());
+    // Mark this as a cluster-unsafe scope since it can be called in
+    // journal threads or connection threads as part of asynchronous
+    // command completion.
+    sys::ClusterUnsafeScope cus;
+
+    bool callSendCompletion = false;
+    receiverCompleted(id);
+    if (requiresAccept)
+        // will cause msg's seq to appear in the next message.accept we send.
+        accepted.add(id);
+
+    // Are there any outstanding Execution.Sync commands pending the
+    // completion of this msg?  If so, complete them.
+    while (!pendingExecutionSyncs.empty() &&
+           receiverGetIncomplete().front() >= pendingExecutionSyncs.front()) {
+        const SequenceNumber id = pendingExecutionSyncs.front();
+        pendingExecutionSyncs.pop();
+        QPID_LOG(debug, getId() << ": delayed execution.sync " << id << " is completed.");
+        receiverCompleted(id);
+        callSendCompletion = true;   // likely peer is pending for this completion.
+    }
+
+    // if the sender has requested immediate notification of the completion...
+    if (requiresSync || callSendCompletion) {
+        sendAcceptAndCompletion();
+    }
 }
 
 void SessionState::handleIn(AMQFrame& frame) {
@@ -396,4 +423,176 @@ framing::AMQP_ClientProxy& SessionState:
     return handler->getClusterOrderProxy();
 }
 
+
+// Current received command is an execution.sync command.
+// Complete this command only when all preceding commands have completed.
+// (called via the invoker() in handleCommand() above)
+void SessionState::addPendingExecutionSync()
+{
+    SequenceNumber syncCommandId = receiverGetCurrent();
+    if (receiverGetIncomplete().front() < syncCommandId) {
+        currentCommandComplete = false;
+        pendingExecutionSyncs.push(syncCommandId);
+        asyncCommandCompleter->flushPendingMessages();
+        QPID_LOG(debug, getId() << ": delaying completion of execution.sync " << syncCommandId);
+    }
+}
+
+
+/** factory for creating a reference-counted IncompleteIngressMsgXfer object
+ * which will be attached to a message that will be completed asynchronously.
+ */
+boost::intrusive_ptr<AsyncCompletion::Callback>
+SessionState::IncompleteIngressMsgXfer::clone()
+{
+    // Optimization: this routine is *only* invoked when the message needs to be asynchronously completed.
+    // If the client is pending the message.transfer completion, flush now to force immediate write to journal.
+    if (requiresSync)
+        msg->flush();
+    else {
+        // otherwise, we need to track this message in order to flush it if an execution.sync arrives
+        // before it has been completed (see flushPendingMessages())
+        pending = true;
+        completerContext->addPendingMessage(msg);
+    }
+
+    return boost::intrusive_ptr<SessionState::IncompleteIngressMsgXfer>(new SessionState::IncompleteIngressMsgXfer(*this));
+}
+
+
+/** Invoked by the asynchronous completer associated with a received
+ * msg that is pending Completion.  May be invoked by the IO thread
+ * (sync == true), or some external thread (!sync).
+ */
+void SessionState::IncompleteIngressMsgXfer::completed(bool sync)
+{
+    if (pending) completerContext->deletePendingMessage(id);
+    if (!sync) {
+        /** note well: this path may execute in any thread.  It is safe to access
+         * the scheduledCompleterContext, since *this has a shared pointer to it.
+         * but not session!
+         */
+        session = 0;
+        QPID_LOG(debug, ": async completion callback scheduled for msg seq=" << id);
+        completerContext->scheduleMsgCompletion(id, requiresAccept, requiresSync);
+    } else {
+        // this path runs directly from the ac->end() call in handleContent() above,
+        // so *session is definately valid.
+        if (session->isAttached()) {
+            QPID_LOG(debug, ": receive completed for msg seq=" << id);
+            session->completeRcvMsg(id, requiresAccept, requiresSync);
+        }
+    }
+    completerContext = boost::intrusive_ptr<AsyncCommandCompleter>();
+}
+
+
+/** Scheduled from an asynchronous command's completed callback to run on
+ * the IO thread.
+ */
+void SessionState::AsyncCommandCompleter::schedule(boost::intrusive_ptr<AsyncCommandCompleter> ctxt)
+{
+    ctxt->completeCommands();
+}
+
+
+/** Track an ingress message that is pending completion */
+void SessionState::AsyncCommandCompleter::addPendingMessage(boost::intrusive_ptr<Message> msg)
+{
+    qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
+    std::pair<SequenceNumber, boost::intrusive_ptr<Message> > item(msg->getCommandId(), msg);
+    bool unique = pendingMsgs.insert(item).second;
+    if (!unique) {
+      assert(false);
+    }
+}
+
+
+/** pending message has completed */
+void SessionState::AsyncCommandCompleter::deletePendingMessage(SequenceNumber id)
+{
+    qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
+    pendingMsgs.erase(id);
+}
+
+
+/** done when an execution.sync arrives */
+void SessionState::AsyncCommandCompleter::flushPendingMessages()
+{
+    std::map<SequenceNumber, boost::intrusive_ptr<Message> > copy;
+    {
+        qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
+        pendingMsgs.swap(copy);    // we've only tracked these in case a flush is needed, so nuke 'em now.
+    }
+    // drop lock, so it is safe to call "flush()"
+    for (std::map<SequenceNumber, boost::intrusive_ptr<Message> >::iterator i = copy.begin();
+         i != copy.end(); ++i) {
+        i->second->flush();
+    }
+}
+
+
+/** mark an ingress Message.Transfer command as completed.
+ * This method must be thread safe - it may run on any thread.
+ */
+void SessionState::AsyncCommandCompleter::scheduleMsgCompletion(SequenceNumber cmd,
+                                                                bool requiresAccept,
+                                                                bool requiresSync)
+{
+    qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
+
+    if (session && isAttached) {
+        MessageInfo msg(cmd, requiresAccept, requiresSync);
+        completedMsgs.push_back(msg);
+        if (completedMsgs.size() == 1) {
+            session->getConnection().requestIOProcessing(boost::bind(&schedule,
+                                                                     session->asyncCommandCompleter));
+        }
+    }
+}
+
+
+/** Cause the session to complete all completed commands.
+ * Executes on the IO thread.
+ */
+void SessionState::AsyncCommandCompleter::completeCommands()
+{
+    qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
+
+    // when session is destroyed, it clears the session pointer via cancel().
+    if (session && session->isAttached()) {
+        for (std::vector<MessageInfo>::iterator msg = completedMsgs.begin();
+             msg != completedMsgs.end(); ++msg) {
+            session->completeRcvMsg(msg->cmd, msg->requiresAccept, msg->requiresSync);
+        }
+    }
+    completedMsgs.clear();
+}
+
+
+/** cancel any pending calls to scheduleComplete */
+void SessionState::AsyncCommandCompleter::cancel()
+{
+    qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
+    session = 0;
+}
+
+
+/** inform the completer that the session has attached,
+ * allows command completion scheduling from any thread */
+void SessionState::AsyncCommandCompleter::attached()
+{
+    qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
+    isAttached = true;
+}
+
+
+/** inform the completer that the session has detached,
+ * disables command completion scheduling from any thread */
+void SessionState::AsyncCommandCompleter::detached()
+{
+    qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
+    isAttached = false;
+}
+
 }} // namespace qpid::broker

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionState.h?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionState.h Thu Oct 20 18:42:46 2011
@@ -30,13 +30,15 @@
 #include "qmf/org/apache/qpid/broker/Session.h"
 #include "qpid/broker/SessionAdapter.h"
 #include "qpid/broker/DeliveryAdapter.h"
-#include "qpid/broker/IncompleteMessageList.h"
+#include "qpid/broker/AsyncCompletion.h"
 #include "qpid/broker/MessageBuilder.h"
 #include "qpid/broker/SessionContext.h"
 #include "qpid/broker/SemanticState.h"
+#include "qpid/sys/Monitor.h"
 
 #include <boost/noncopyable.hpp>
 #include <boost/scoped_ptr.hpp>
+#include <boost/intrusive_ptr.hpp>
 
 #include <set>
 #include <vector>
@@ -123,6 +125,10 @@ class SessionState : public qpid::Sessio
 
     const SessionId& getSessionId() const { return getId(); }
 
+    // Used by ExecutionHandler sync command processing.  Notifies
+    // the SessionState of a received Execution.Sync command.
+    void addPendingExecutionSync();
+
     // Used to delay creation of management object for sessions
     // belonging to inter-broker bridges
     void addManagementObject();
@@ -130,7 +136,10 @@ class SessionState : public qpid::Sessio
   private:
     void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id);
     void handleContent(framing::AMQFrame& frame, const framing::SequenceNumber& id);
-    void enqueued(boost::intrusive_ptr<Message> msg);
+
+    // indicate that the given ingress msg has been completely received by the
+    // broker, and the msg's message.transfer command can be considered completed.
+    void completeRcvMsg(SequenceNumber id, bool requiresAccept, bool requiresSync);
 
     void handleIn(framing::AMQFrame& frame);
     void handleOut(framing::AMQFrame& frame);
@@ -156,8 +165,6 @@ class SessionState : public qpid::Sessio
     SemanticState semanticState;
     SessionAdapter adapter;
     MessageBuilder msgBuilder;
-    IncompleteMessageList incomplete;
-    IncompleteMessageList::CompletionListener enqueuedOp;
     qmf::org::apache::qpid::broker::Session* mgmtObject;
     qpid::framing::SequenceSet accepted;
 
@@ -166,6 +173,110 @@ class SessionState : public qpid::Sessio
     boost::scoped_ptr<RateFlowcontrol> rateFlowcontrol;
     boost::intrusive_ptr<sys::TimerTask> flowControlTimer;
 
+    // sequence numbers for pending received Execution.Sync commands
+    std::queue<SequenceNumber> pendingExecutionSyncs;
+    bool currentCommandComplete;
+
+    /** This class provides a context for completing asynchronous commands in a thread
+     * safe manner.  Asynchronous commands save their completion state in this class.
+     * This class then schedules the completeCommands() method in the IO thread.
+     * While running in the IO thread, completeCommands() may safely complete all
+     * saved commands without the risk of colliding with other operations on this
+     * SessionState.
+     */
+    class AsyncCommandCompleter : public RefCounted {
+    private:
+        SessionState *session;
+        bool isAttached;
+        qpid::sys::Mutex completerLock;
+
+        // special-case message.transfer commands for optimization
+        struct MessageInfo {
+            SequenceNumber cmd; // message.transfer command id
+            bool requiresAccept;
+            bool requiresSync;
+        MessageInfo(SequenceNumber c, bool a, bool s)
+        : cmd(c), requiresAccept(a), requiresSync(s) {}
+        };
+        std::vector<MessageInfo> completedMsgs;
+        // If an ingress message does not require a Sync, we need to
+        // hold a reference to it in case an Execution.Sync command is received and we
+        // have to manually flush the message.
+        std::map<SequenceNumber, boost::intrusive_ptr<Message> > pendingMsgs;
+
+        /** complete all pending commands, runs in IO thread */
+        void completeCommands();
+
+        /** for scheduling a run of "completeCommands()" on the IO thread */
+        static void schedule(boost::intrusive_ptr<AsyncCommandCompleter>);
+
+    public:
+        AsyncCommandCompleter(SessionState *s) : session(s), isAttached(s->isAttached()) {};
+        ~AsyncCommandCompleter() {};
+
+        /** track a message pending ingress completion */
+        void addPendingMessage(boost::intrusive_ptr<Message> m);
+        void deletePendingMessage(SequenceNumber id);
+        void flushPendingMessages();
+        /** schedule the processing of a completed ingress message.transfer command */
+        void scheduleMsgCompletion(SequenceNumber cmd,
+                                   bool requiresAccept,
+                                   bool requiresSync);
+        void cancel();  // called by SessionState destructor.
+        void attached();  // called by SessionState on attach()
+        void detached();  // called by SessionState on detach()
+    };
+    boost::intrusive_ptr<AsyncCommandCompleter> asyncCommandCompleter;
+
+    /** Abstract class that represents a single asynchronous command that is
+     * pending completion.
+     */
+    class AsyncCommandContext : public AsyncCompletion::Callback
+    {
+     public:
+        AsyncCommandContext( SessionState *ss, SequenceNumber _id )
+          : id(_id), completerContext(ss->asyncCommandCompleter) {}
+        virtual ~AsyncCommandContext() {}
+
+     protected:
+        SequenceNumber id;
+        boost::intrusive_ptr<AsyncCommandCompleter> completerContext;
+    };
+
+    /** incomplete Message.transfer commands - inbound to broker from client
+     */
+    class IncompleteIngressMsgXfer : public SessionState::AsyncCommandContext
+    {
+     public:
+        IncompleteIngressMsgXfer( SessionState *ss,
+                                  boost::intrusive_ptr<Message> m )
+          : AsyncCommandContext(ss, m->getCommandId()),
+          session(ss),
+          msg(m),
+          requiresAccept(m->requiresAccept()),
+          requiresSync(m->getFrames().getMethod()->isSync()),
+          pending(false) {}
+        IncompleteIngressMsgXfer( const IncompleteIngressMsgXfer& x )
+          : AsyncCommandContext(x.session, x.msg->getCommandId()),
+          session(x.session),
+          msg(x.msg),
+          requiresAccept(x.requiresAccept),
+          requiresSync(x.requiresSync),
+          pending(x.pending) {}
+
+  virtual ~IncompleteIngressMsgXfer() {};
+
+        virtual void completed(bool);
+        virtual boost::intrusive_ptr<AsyncCompletion::Callback> clone();
+
+     private:
+        SessionState *session;  // only valid if sync flag in callback is true
+        boost::intrusive_ptr<Message> msg;
+        bool requiresAccept;
+        bool requiresSync;
+        bool pending;   // true if msg saved on pending list...
+    };
+
     friend class SessionManager;
 };
 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/ThresholdAlerts.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/ThresholdAlerts.cpp?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/ThresholdAlerts.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/ThresholdAlerts.cpp Thu Oct 20 18:42:46 2011
@@ -28,6 +28,52 @@
 
 namespace qpid {
 namespace broker {
+namespace {
+const qmf::org::apache::qpid::broker::EventQueueThresholdExceeded EVENT("dummy", 0, 0);
+bool isQMFv2(const boost::intrusive_ptr<Message> message)
+{
+    const qpid::framing::MessageProperties* props = message->getProperties<qpid::framing::MessageProperties>();
+    return props && props->getAppId() == "qmf2";
+}
+
+bool isThresholdEvent(const boost::intrusive_ptr<Message> message)
+{
+    if (message->getIsManagementMessage()) {
+        //is this a qmf event? if so is it a threshold event?
+        if (isQMFv2(message)) {
+            const qpid::framing::FieldTable* headers = message->getApplicationHeaders();
+            if (headers && headers->getAsString("qmf.content") == "_event") {
+                //decode as list
+                std::string content = message->getFrames().getContent();
+                qpid::types::Variant::List list;
+                qpid::amqp_0_10::ListCodec::decode(content, list);
+                if (list.empty() || list.front().getType() != qpid::types::VAR_MAP) return false;
+                qpid::types::Variant::Map map = list.front().asMap();
+                try {
+                    std::string eventName = map["_schema_id"].asMap()["_class_name"].asString();
+                    return eventName == EVENT.getEventName();
+                } catch (const std::exception& e) {
+                    QPID_LOG(error, "Error checking for recursive threshold alert: " << e.what());
+                }
+            }
+        } else {
+            std::string content = message->getFrames().getContent();
+            qpid::framing::Buffer buffer(const_cast<char*>(content.data()), content.size());
+            if (buffer.getOctet() == 'A' && buffer.getOctet() == 'M' && buffer.getOctet() == '2' && buffer.getOctet() == 'e') {
+                buffer.getLong();//sequence
+                std::string packageName;
+                buffer.getShortString(packageName);
+                if (packageName != EVENT.getPackageName()) return false;
+                std::string eventName;
+                buffer.getShortString(eventName);
+                return eventName == EVENT.getEventName();
+            }
+        }
+    }
+    return false;
+}
+}
+
 ThresholdAlerts::ThresholdAlerts(const std::string& n,
                                  qpid::management::ManagementAgent& a,
                                  const uint32_t ct,
@@ -44,8 +90,14 @@ void ThresholdAlerts::enqueued(const Que
     if ((countThreshold && count >= countThreshold) || (sizeThreshold && size >= sizeThreshold)) {
         if ((repeatInterval == 0 && lastAlert == qpid::sys::EPOCH)
             || qpid::sys::Duration(lastAlert, qpid::sys::now()) > repeatInterval) {
-            agent.raiseEvent(qmf::org::apache::qpid::broker::EventQueueThresholdExceeded(name, count, size));
+            //Note: Raising an event may result in messages being
+            //enqueued on queues; it may even be that this event
+            //causes a message to be enqueued on the queue we are
+            //tracking, and so we need to avoid recursing
+            if (isThresholdEvent(m.payload)) return;
             lastAlert = qpid::sys::now();
+            agent.raiseEvent(qmf::org::apache::qpid::broker::EventQueueThresholdExceeded(name, count, size));
+            QPID_LOG(info, "Threshold event triggered for " << name << ", count=" << count << ", size=" << size);
         }
     }
 }
@@ -75,12 +127,12 @@ void ThresholdAlerts::observe(Queue& que
 }
 
 void ThresholdAlerts::observe(Queue& queue, qpid::management::ManagementAgent& agent,
-                              const qpid::framing::FieldTable& settings)
+                              const qpid::framing::FieldTable& settings, uint16_t limitRatio)
 
 {
     qpid::types::Variant::Map map;
     qpid::amqp_0_10::translate(settings, map);
-    observe(queue, agent, map);
+    observe(queue, agent, map, limitRatio);
 }
 
 template <class T>
@@ -118,19 +170,19 @@ class Option
 };
 
 void ThresholdAlerts::observe(Queue& queue, qpid::management::ManagementAgent& agent,
-                              const qpid::types::Variant::Map& settings)
+                              const qpid::types::Variant::Map& settings, uint16_t limitRatio)
 
 {
     //Note: aliases are keys defined by java broker
     Option<int64_t> repeatInterval("qpid.alert_repeat_gap", 60);
     repeatInterval.addAlias("x-qpid-minimum-alert-repeat-gap");
 
-    //If no explicit threshold settings were given use 80% of any
-    //limit from the policy.
+    //If no explicit threshold settings were given use specified
+    //percentage of any limit from the policy.
     const QueuePolicy* policy = queue.getPolicy();
-    Option<uint32_t> countThreshold("qpid.alert_count", (uint32_t) (policy ? policy->getMaxCount()*0.8 : 0));
+    Option<uint32_t> countThreshold("qpid.alert_count", (uint32_t) (policy && limitRatio ? (policy->getMaxCount()*limitRatio/100) : 0));
     countThreshold.addAlias("x-qpid-maximum-message-count");
-    Option<uint64_t> sizeThreshold("qpid.alert_size", (uint64_t) (policy ? policy->getMaxSize()*0.8 : 0));
+    Option<uint64_t> sizeThreshold("qpid.alert_size", (uint64_t) (policy && limitRatio ? (policy->getMaxSize()*limitRatio/100) : 0));
     sizeThreshold.addAlias("x-qpid-maximum-message-size");
 
     observe(queue, agent, countThreshold.get(settings), sizeThreshold.get(settings), repeatInterval.get(settings));

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/ThresholdAlerts.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/ThresholdAlerts.h?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/ThresholdAlerts.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/ThresholdAlerts.h Thu Oct 20 18:42:46 2011
@@ -50,14 +50,17 @@ class ThresholdAlerts : public QueueObse
                     const long repeatInterval);
     void enqueued(const QueuedMessage&);
     void dequeued(const QueuedMessage&);
+    void acquired(const QueuedMessage&) {};
+    void requeued(const QueuedMessage&) {};
+
     static void observe(Queue& queue, qpid::management::ManagementAgent& agent,
                         const uint64_t countThreshold,
                         const uint64_t sizeThreshold,
                         const long repeatInterval);
     static void observe(Queue& queue, qpid::management::ManagementAgent& agent,
-                        const qpid::framing::FieldTable& settings);
+                        const qpid::framing::FieldTable& settings, uint16_t limitRatio);
     static void observe(Queue& queue, qpid::management::ManagementAgent& agent,
-                        const qpid::types::Variant::Map& settings);
+                        const qpid::types::Variant::Map& settings, uint16_t limitRatio);
   private:
     const std::string name;
     qpid::management::ManagementAgent& agent;

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/TopicExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/TopicExchange.cpp?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/TopicExchange.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/TopicExchange.cpp Thu Oct 20 18:42:46 2011
@@ -221,6 +221,7 @@ TopicExchange::TopicExchange(const std::
 
 bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args)
 {
+    ClearCache cc(&cacheLock,&bindingCache); // clear the cache on function exit.
     string fedOp(args ? args->getAsString(qpidFedOp) : fedOpBind);
     string fedTags(args ? args->getAsString(qpidFedTags) : "");
     string fedOrigin(args ? args->getAsString(qpidFedOrigin) : "");
@@ -249,21 +250,21 @@ bool TopicExchange::bind(Queue::shared_p
             if (mgmtExchange != 0) {
                 mgmtExchange->inc_bindingCount();
             }
-            QPID_LOG(debug, "Bound key [" << routingPattern << "] to queue " << queue->getName()
-                     << " (origin=" << fedOrigin << ")");
+            QPID_LOG(debug, "Binding key [" << routingPattern << "] to queue " << queue->getName()
+                     << " on exchange " << getName() << " (origin=" << fedOrigin << ")");
         }
     } else if (fedOp == fedOpUnbind) {
-        bool reallyUnbind = false;
-        {
-            RWlock::ScopedWlock l(lock);
-            BindingKey* bk = bindingTree.getBindingKey(routingPattern);
-            if (bk) {
-                propagate = bk->fedBinding.delOrigin(queue->getName(), fedOrigin);
-                reallyUnbind = bk->fedBinding.countFedBindings(queue->getName()) == 0;
+        RWlock::ScopedWlock l(lock);
+        BindingKey* bk = getQueueBinding(queue, routingPattern);
+        if (bk) {
+            QPID_LOG(debug, "FedOpUnbind [" << routingPattern << "] from exchange " << getName()
+                     << " on queue=" << queue->getName() << " origin=" << fedOrigin);
+            propagate = bk->fedBinding.delOrigin(queue->getName(), fedOrigin);
+            // if this was the last binding for the queue, delete the binding
+            if (bk->fedBinding.countFedBindings(queue->getName()) == 0) {
+                deleteBinding(queue, routingPattern, bk);
             }
         }
-        if (reallyUnbind)
-            unbind(queue, routingPattern, 0);
     } else if (fedOp == fedOpReorigin) {
         /** gather up all the keys that need rebinding in a local vector
          * while holding the lock.  Then propagate once the lock is
@@ -281,20 +282,38 @@ bool TopicExchange::bind(Queue::shared_p
         }
     }
 
+    cc.clearCache(); // clear the cache before we IVE route.
     routeIVE();
     if (propagate)
         propagateFedOp(routingKey, fedTags, fedOp, fedOrigin);
     return true;
 }
 
-bool TopicExchange::unbind(Queue::shared_ptr queue, const string& constRoutingKey, const FieldTable* /*args*/){
+bool TopicExchange::unbind(Queue::shared_ptr queue, const string& constRoutingKey, const FieldTable* args)
+{
+    string fedOrigin(args ? args->getAsString(qpidFedOrigin) : "");
+    QPID_LOG(debug, "Unbinding key [" << constRoutingKey << "] from queue " << queue->getName()
+             << " on exchange " << getName() << " origin=" << fedOrigin << ")" );
+
+    ClearCache cc(&cacheLock,&bindingCache); // clear the cache on function exit.
     RWlock::ScopedWlock l(lock);
     string routingKey = normalize(constRoutingKey);
-    BindingKey* bk = bindingTree.getBindingKey(routingKey);
+    BindingKey* bk = getQueueBinding(queue, routingKey);
     if (!bk) return false;
-    Binding::vector& qv(bk->bindingVector);
-    bool propagate = false;
+    bool propagate = bk->fedBinding.delOrigin(queue->getName(), fedOrigin);
+    deleteBinding(queue, routingKey, bk);
+    if (propagate)
+        propagateFedOp(routingKey, string(), fedOpUnbind, string());
+    return true;
+}
+
 
+bool TopicExchange::deleteBinding(Queue::shared_ptr queue,
+                                  const std::string& routingKey,
+                                  BindingKey *bk)
+{
+    // Note well: write lock held by caller
+    Binding::vector& qv(bk->bindingVector);
     Binding::vector::iterator q;
     for (q = qv.begin(); q != qv.end(); q++)
         if ((*q)->queue == queue)
@@ -303,42 +322,55 @@ bool TopicExchange::unbind(Queue::shared
     qv.erase(q);
     assert(nBindings > 0);
     nBindings--;
-    propagate = bk->fedBinding.delOrigin();
+
     if(qv.empty()) {
         bindingTree.removeBindingKey(routingKey);
     }
     if (mgmtExchange != 0) {
         mgmtExchange->dec_bindingCount();
     }
-    QPID_LOG(debug, "Unbound [" << routingKey << "] from queue " << queue->getName());
-
-    if (propagate)
-        propagateFedOp(routingKey, string(), fedOpUnbind, string());
+    QPID_LOG(debug, "Unbound key [" << routingKey << "] from queue " << queue->getName()
+             << " on exchange " << getName());
     return true;
 }
 
-bool TopicExchange::isBound(Queue::shared_ptr queue, const string& pattern)
+/** returns a pointer to the BindingKey if the given queue is bound to this
+ * exchange using the routing pattern. 0 if queue binding does not exist.
+ */
+TopicExchange::BindingKey *TopicExchange::getQueueBinding(Queue::shared_ptr queue, const string& pattern)
 {
     // Note well: lock held by caller....
     BindingKey *bk = bindingTree.getBindingKey(pattern);  // Exact match against binding pattern
-    if (!bk) return false;
+    if (!bk) return 0;
     Binding::vector& qv(bk->bindingVector);
     Binding::vector::iterator q;
     for (q = qv.begin(); q != qv.end(); q++)
         if ((*q)->queue == queue)
             break;
-    return q != qv.end();
+    return (q != qv.end()) ? bk : 0;
 }
 
 void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/)
 {
     // Note: PERFORMANCE CRITICAL!!!
-    BindingList b(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >);
+    BindingList b;
+    std::map<std::string, BindingList>::iterator it;
+    {  // only lock the cache for read
+       RWlock::ScopedRlock cl(cacheLock);
+       it = bindingCache.find(routingKey);
+       if (it != bindingCache.end()) {
+           b = it->second;
+       }
+    }
     PreRoute pr(msg, this);
-    BindingsFinderIter bindingsFinder(b);
+    if (!b.get())  // no cache hit
     {
         RWlock::ScopedRlock l(lock);
+    	b = BindingList(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >);
+        BindingsFinderIter bindingsFinder(b);
         bindingTree.iterateMatch(routingKey, bindingsFinder);
+        RWlock::ScopedWlock cwl(cacheLock);
+        bindingCache[routingKey] = b; // update cache
     }
     doRoute(msg, b);
 }
@@ -348,7 +380,7 @@ bool TopicExchange::isBound(Queue::share
     RWlock::ScopedRlock l(lock);
     if (routingKey && queue) {
         string key(normalize(*routingKey));
-        return isBound(queue, key);
+        return getQueueBinding(queue, key) != 0;
     } else if (!routingKey && !queue) {
         return nBindings > 0;
     } else if (routingKey) {

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/TopicExchange.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/TopicExchange.h?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/TopicExchange.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/TopicExchange.h Thu Oct 20 18:42:46 2011
@@ -56,7 +56,7 @@ class TopicExchange : public virtual Exc
     //    |   +-->d-->...
     //    +-->x-->y-->...
     //
-    class BindingNode {
+    class QPID_BROKER_CLASS_EXTERN BindingNode {
     public:
 
         typedef boost::shared_ptr<BindingNode> shared_ptr;
@@ -135,8 +135,31 @@ class TopicExchange : public virtual Exc
     BindingNode bindingTree;
     unsigned long nBindings;
     qpid::sys::RWlock lock;     // protects bindingTree and nBindings
-
-    bool isBound(Queue::shared_ptr queue, const std::string& pattern);
+    qpid::sys::RWlock cacheLock;     // protects cache
+    std::map<std::string, BindingList> bindingCache; // cache of matched routes.
+    class ClearCache {
+    private:
+        qpid::sys::RWlock* cacheLock;
+        std::map<std::string, BindingList>* bindingCache;
+	bool cleared; 
+    public:
+        ClearCache(qpid::sys::RWlock* l, std::map<std::string, BindingList>* bc): cacheLock(l),
+             bindingCache(bc),cleared(false) {};
+        void clearCache() {
+             qpid::sys::RWlock::ScopedWlock l(*cacheLock);
+             if (!cleared) {
+                 bindingCache->clear();
+                 cleared =true;
+             }
+        };
+        ~ClearCache(){ 
+	     clearCache();
+        };
+    };
+    BindingKey *getQueueBinding(Queue::shared_ptr queue, const std::string& pattern);
+    bool deleteBinding(Queue::shared_ptr queue,
+                       const std::string& routingKey,
+                       BindingKey *bk);
 
     class ReOriginIter;
     class BindingsFinderIter;

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/TxBuffer.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/TxBuffer.cpp?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/TxBuffer.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/TxBuffer.cpp Thu Oct 20 18:42:46 2011
@@ -76,5 +76,5 @@ bool TxBuffer::commitLocal(Transactional
 }
 
 void TxBuffer::accept(TxOpConstVisitor& v) const {
-    std::for_each(ops.begin(), ops.end(), boost::bind(&TxOp::accept, _1, boost::ref(v))); 
+    std::for_each(ops.begin(), ops.end(), boost::bind(&TxOp::accept, _1, boost::ref(v)));
 }

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/TxPublish.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/TxPublish.cpp?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/TxPublish.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/TxPublish.cpp Thu Oct 20 18:42:46 2011
@@ -90,14 +90,7 @@ void TxPublish::deliverTo(const boost::s
 
 void TxPublish::prepare(TransactionContext* ctxt, const boost::shared_ptr<Queue> queue)
 {
-    if (!queue->enqueue(ctxt, msg)){
-        /**
-         * if not store then mark message for ack and deleivery once
-         * commit happens, as async IO will never set it when no store
-         * exists
-         */
-	msg->enqueueComplete();
-    }
+    queue->enqueue(ctxt, msg);
 }
 
 TxPublish::Commit::Commit(intrusive_ptr<Message>& _msg) : msg(_msg){}



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message