qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r1081634 [3/9] - in /qpid/branches/qpid-2920/qpid: ./ bin/ cpp/ cpp/bindings/qpid/ cpp/bindings/qpid/perl/ cpp/bindings/qpid/python/ cpp/bindings/qpid/ruby/ cpp/examples/ cpp/examples/direct/ cpp/examples/failover/ cpp/examples/fanout/ cpp/...
Date Tue, 15 Mar 2011 01:54:18 GMT
Modified: qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/SemanticState.cpp Tue Mar 15 01:54:07 2011
@@ -88,7 +88,7 @@ void SemanticState::closed() {
         //prevent requeued messages being redelivered to consumers
         for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
             disable(i->second);
-        }        
+        }
         if (dtxBuffer.get()) {
             dtxBuffer->fail();
         }
@@ -107,7 +107,7 @@ bool SemanticState::exists(const string&
     return consumers.find(consumerTag) != consumers.end();
 }
 
-void SemanticState::consume(const string& tag, 
+void SemanticState::consume(const string& tag,
                             Queue::shared_ptr queue, bool ackRequired, bool acquire,
                             bool exclusive, const string& resumeId, uint64_t resumeTtl, const FieldTable& arguments)
 {
@@ -116,7 +116,8 @@ void SemanticState::consume(const string
     consumers[tag] = c;
 }
 
-void SemanticState::cancel(const string& tag){
+bool SemanticState::cancel(const string& tag)
+{
     ConsumerImplMap::iterator i = consumers.find(tag);
     if (i != consumers.end()) {
         cancel(i->second);
@@ -124,7 +125,9 @@ void SemanticState::cancel(const string&
         //should cancel all unacked messages for this consumer so that
         //they are not redelivered on recovery
         for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::cancel, _1, tag));
-        
+        return true;
+    } else {
+        return false;
     }
 }
 
@@ -194,7 +197,7 @@ void SemanticState::endDtx(const std::st
         dtxBuffer->fail();
     } else {
         dtxBuffer->markEnded();
-    }    
+    }
     dtxBuffer.reset();
 }
 
@@ -254,9 +257,9 @@ void SemanticState::record(const Deliver
 
 const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency");
 
-SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, 
-                                          const string& _name, 
-                                          Queue::shared_ptr _queue, 
+SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
+                                          const string& _name,
+                                          Queue::shared_ptr _queue,
                                           bool ack,
                                           bool _acquire,
                                           bool _exclusive,
@@ -265,20 +268,20 @@ SemanticState::ConsumerImpl::ConsumerImp
                                           const framing::FieldTable& _arguments
 
 
-) : 
+) :
     Consumer(_acquire),
-    parent(_parent), 
-    name(_name), 
-    queue(_queue), 
-    ackExpected(ack), 
+    parent(_parent),
+    name(_name),
+    queue(_queue),
+    ackExpected(ack),
     acquire(_acquire),
-    blocked(true), 
+    blocked(true),
     windowing(true),
     exclusive(_exclusive),
     resumeId(_resumeId),
     resumeTtl(_resumeTtl),
     arguments(_arguments),
-    msgCredit(0), 
+    msgCredit(0),
     byteCredit(0),
     notifyEnabled(true),
     syncFrequency(_arguments.getAsInt(QPID_SYNC_FREQUENCY)),
@@ -289,7 +292,7 @@ SemanticState::ConsumerImpl::ConsumerImp
     {
         ManagementAgent* agent = parent->session.getBroker().getManagementAgent();
         qpid::management::Manageable* ms = dynamic_cast<qpid::management::Manageable*> (&(parent->session));
-        
+
         if (agent != 0)
         {
             mgmtObject = new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId() ,name,
@@ -331,7 +334,7 @@ bool SemanticState::ConsumerImpl::delive
     if (!ackExpected && acquire) record.setEnded();//allows message to be released now its been delivered
     if (windowing || ackExpected || !acquire) {
         parent->record(record);
-    } 
+    }
     if (acquire && !ackExpected) {
         queue->dequeue(0, msg);
     }
@@ -351,7 +354,7 @@ bool SemanticState::ConsumerImpl::accept
     // checkCredit fails because the message is to big, we should
     // remain on queue's listener list for possible smaller messages
     // in future.
-    // 
+    //
     blocked = !(filter(msg) && checkCredit(msg));
     return !blocked;
 }
@@ -372,7 +375,7 @@ void SemanticState::ConsumerImpl::alloca
 {
     assertClusterSafe();
     uint32_t originalMsgCredit = msgCredit;
-    uint32_t originalByteCredit = byteCredit;        
+    uint32_t originalByteCredit = byteCredit;
     if (msgCredit != 0xFFFFFFFF) {
         msgCredit--;
     }
@@ -382,7 +385,7 @@ void SemanticState::ConsumerImpl::alloca
     QPID_LOG(debug, "Credit allocated for " << ConsumerName(*this)
              << ", was " << " bytes: " << originalByteCredit << " msgs: " << originalMsgCredit
              << " now bytes: " << byteCredit << " msgs: " << msgCredit);
-    
+
 }
 
 bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg)
@@ -396,7 +399,7 @@ bool SemanticState::ConsumerImpl::checkC
     return enoughCredit;
 }
 
-SemanticState::ConsumerImpl::~ConsumerImpl() 
+SemanticState::ConsumerImpl::~ConsumerImpl()
 {
     if (mgmtObject != 0)
         mgmtObject->resourceDestroy ();
@@ -414,7 +417,7 @@ void SemanticState::unsubscribe(Consumer
     Queue::shared_ptr queue = c->getQueue();
     if(queue) {
         queue->cancel(c);
-        if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) {            
+        if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) {
             Queue::tryAutoDelete(session.getBroker(), queue);
         }
     }
@@ -457,7 +460,7 @@ const std::string nullstring;
 
 void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
     msg->setTimestamp(getSession().getBroker().getExpiryPolicy());
-    
+
     std::string exchangeName = msg->getExchangeName();
     if (!cacheExchange || cacheExchange->getName() != exchangeName)
         cacheExchange = session.getBroker().getExchanges().get(exchangeName);
@@ -466,7 +469,7 @@ void SemanticState::route(intrusive_ptr<
     /* verify the userid if specified: */
     std::string id =
     	msg->hasProperties<MessageProperties>() ? msg->getProperties<MessageProperties>()->getUserId() : nullstring;
-    
+
     if (authMsg &&  !id.empty() && !(id == userID || (isDefaultRealm && id == userName)))
     {
         QPID_LOG(debug, "authorised user id : " << userID << " but user id in message declared as " << id);
@@ -484,7 +487,7 @@ void SemanticState::route(intrusive_ptr<
 
     if (!strategy.delivered) {
         //TODO:if discard-unroutable, just drop it
-        //TODO:else if accept-mode is explicit, reject it 
+        //TODO:else if accept-mode is explicit, reject it
         //else route it to alternate exchange
         if (cacheExchange->getAlternate()) {
             cacheExchange->getAlternate()->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders());
@@ -513,7 +516,7 @@ void SemanticState::ConsumerImpl::reques
 }
 
 bool SemanticState::complete(DeliveryRecord& delivery)
-{    
+{
     ConsumerImplMap::iterator i = consumers.find(delivery.getTag());
     if (i != consumers.end()) {
         i->second->complete(delivery);
@@ -541,7 +544,7 @@ void SemanticState::recover(bool requeue
         unacked.clear();
         for_each(copy.rbegin(), copy.rend(), mem_fun_ref(&DeliveryRecord::requeue));
     }else{
-        for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::redeliver, _1, this));        
+        for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::redeliver, _1, this));
         //unconfirmed messages re redelivered and therefore have their
         //id adjusted, confirmed messages are not and so the ordering
         //w.r.t id is lost
@@ -673,7 +676,7 @@ Queue::shared_ptr SemanticState::getQueu
 }
 
 AckRange SemanticState::findRange(DeliveryId first, DeliveryId last)
-{   
+{
     return DeliveryRecord::findRange(unacked, first, last);
 }
 
@@ -764,13 +767,13 @@ void SemanticState::accepted(const Seque
         //in transactional mode, don't dequeue or remove, just
         //maintain set of acknowledged messages:
         accumulatedAck.add(commands);
-        
+
         if (dtxBuffer.get()) {
             //if enlisted in a dtx, copy the relevant slice from
             //unacked and record it against that transaction
             TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked));
             accumulatedAck.clear();
-            dtxBuffer->enlist(txAck);    
+            dtxBuffer->enlist(txAck);
 
             //mark the relevant messages as 'ended' in unacked
             //if the messages are already completed, they can be
@@ -792,7 +795,6 @@ void SemanticState::accepted(const Seque
 }
 
 void SemanticState::completed(const SequenceSet& commands) {
-    assertClusterSafe();
     DeliveryRecords::iterator removed =
         remove_if(unacked.begin(), unacked.end(),
                   isInSequenceSetAnd(commands,
@@ -803,7 +805,6 @@ void SemanticState::completed(const Sequ
 
 void SemanticState::attached()
 {
-    assertClusterSafe();
     for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
         i->second->enableNotify();
         session.getConnection().outputTasks.addOutputTask(i->second.get());
@@ -813,7 +814,6 @@ void SemanticState::attached()
 
 void SemanticState::detached()
 {
-    assertClusterSafe();
     for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
         i->second->disableNotify();
         session.getConnection().outputTasks.removeOutputTask(i->second.get());

Modified: qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/SemanticState.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/SemanticState.h?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/SemanticState.h Tue Mar 15 01:54:07 2011
@@ -205,7 +205,7 @@ class SemanticState : private boost::non
                  const std::string& resumeId=std::string(), uint64_t resumeTtl=0,
                  const framing::FieldTable& = framing::FieldTable());
 
-    void cancel(const std::string& tag);
+    bool cancel(const std::string& tag);
 
     void setWindowMode(const std::string& destination);
     void setCreditMode(const std::string& destination);

Modified: qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Tue Mar 15 01:54:07 2011
@@ -114,7 +114,7 @@ void SessionAdapter::ExchangeHandlerImpl
                                                                  "existing"));
             }
         }catch(UnknownExchangeTypeException& /*e*/){
-            throw CommandInvalidException(QPID_MSG("Exchange type not implemented: " << type));
+            throw NotFoundException(QPID_MSG("Exchange type not implemented: " << type));
         }
     }
 }
@@ -353,12 +353,12 @@ void SessionAdapter::QueueHandlerImpl::c
     } else if(ifUnused && queue->getConsumerCount() > 0) {
         throw PreconditionFailedException(QPID_MSG("Cannot delete queue "
                                                    << queue->getName() << "; queue in use"));
-    } else if (queue->isExclusiveOwner(&getConnection())) {
+    } else if (queue->isExclusiveOwner(&session)) {
         //remove the queue from the list of exclusive queues if necessary
-        QueueVector::iterator i = std::find(getConnection().exclusiveQueues.begin(),
-                                            getConnection().exclusiveQueues.end(),
+        QueueVector::iterator i = std::find(exclusiveQueues.begin(),
+                                            exclusiveQueues.end(),
                                             queue);
-        if (i < getConnection().exclusiveQueues.end()) getConnection().exclusiveQueues.erase(i);
+        if (i < exclusiveQueues.end()) exclusiveQueues.erase(i);
     }    
 }
         
@@ -431,7 +431,9 @@ SessionAdapter::MessageHandlerImpl::subs
 void
 SessionAdapter::MessageHandlerImpl::cancel(const string& destination )
 {
-    state.cancel(destination);
+    if (!state.cancel(destination)) {
+        throw NotFoundException(QPID_MSG("No such subscription: " << destination));
+    }
 
     ManagementAgent* agent = getBroker().getManagementAgent();
     if (agent)

Modified: qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/SessionState.cpp Tue Mar 15 01:54:07 2011
@@ -62,7 +62,7 @@ SessionState::SessionState(
       msgBuilder(&broker.getStore()),
       mgmtObject(0),
       rateFlowcontrol(0),
-      scheduledCompleterContext(new ScheduledCompleterContext(this))
+      asyncCommandCompleter(new AsyncCommandCompleter(this))
 {
     uint32_t maxRate = broker.getOptions().maxSessionRate;
     if (maxRate) {
@@ -102,25 +102,7 @@ SessionState::~SessionState() {
     if (flowControlTimer)
         flowControlTimer->cancel();
 
-    // clean up any outstanding incomplete commands
-    {
-        qpid::sys::ScopedLock<Mutex> l(incompleteCmdsLock);
-        std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> > copy(incompleteCmds);
-        incompleteCmds.clear();
-        while (!copy.empty()) {
-            boost::shared_ptr<IncompleteCommandContext> ref(copy.begin()->second);
-            copy.erase(copy.begin());
-            {
-                // note: need to drop lock, as callback may attempt to take it.
-                qpid::sys::ScopedUnlock<Mutex> ul(incompleteCmdsLock);
-                ref->cancel();
-            }
-        }
-    }
-
-    // At this point, we are guaranteed no further completion callbacks will be
-    // made.  Cancel any outstanding scheduledCompleter calls...
-    scheduledCompleterContext->cancel();
+    asyncCommandCompleter->cancel();
 }
 
 AMQP_ClientProxy& SessionState::getProxy() {
@@ -276,13 +258,11 @@ void SessionState::handleContent(AMQFram
             msg->getFrames().append(header);
         }
         msg->setPublisher(&getConnection());
-
-        boost::shared_ptr<AsyncCompletion> ac(boost::dynamic_pointer_cast<AsyncCompletion>(createIngressMsgXferContext(msg)));
-        msg->setIngressCompletion( ac );
-        ac->begin();
+        msg->getIngressCompletion().begin();
         semanticState.handle(msg);
         msgBuilder.end();
-        ac->end();  // allows msg to complete xfer
+        IncompleteIngressMsgXfer xfer(this, msg);
+        msg->getIngressCompletion().end(xfer);  // allows msg to complete xfer
     }
 
     // Handle producer session flow control
@@ -451,110 +431,94 @@ void SessionState::addPendingExecutionSy
 }
 
 
-/** factory for creating IncompleteIngressMsgXfer objects which
- * can be references from Messages as ingress AsyncCompletion objects.
+/** factory for creating a reference-counted IncompleteIngressMsgXfer object
+ * which will be attached to a message that will be completed asynchronously.
  */
-boost::shared_ptr<SessionState::IncompleteIngressMsgXfer>
-SessionState::createIngressMsgXferContext(boost::intrusive_ptr<Message> msg)
+boost::intrusive_ptr<AsyncCompletion::Callback>
+SessionState::IncompleteIngressMsgXfer::clone()
 {
-    SequenceNumber id = msg->getCommandId();
-    boost::shared_ptr<SessionState::IncompleteIngressMsgXfer> cmd(new SessionState::IncompleteIngressMsgXfer(this, id, msg));
-    qpid::sys::ScopedLock<Mutex> l(incompleteCmdsLock);
-    incompleteCmds[id] = cmd;
-    return cmd;
+    boost::intrusive_ptr<SessionState::IncompleteIngressMsgXfer> cb(new SessionState::IncompleteIngressMsgXfer(session, msg));
+    return cb;
 }
 
 
-/** Invoked by the asynchronous completer associated with
- * a received msg that is pending Completion.  May be invoked
- * by the SessionState directly (sync == true), or some external
- * entity (!sync).
+/** Invoked by the asynchronous completer associated with a received
+ * msg that is pending Completion.  May be invoked by the IO thread
+ * (sync == true), or some external thread (!sync).
  */
 void SessionState::IncompleteIngressMsgXfer::completed(bool sync)
 {
     if (!sync) {
         /** note well: this path may execute in any thread.  It is safe to access
-         * the session, as the SessionState destructor will cancel all outstanding
-         * callbacks before getting destroyed (so we'll never get here).
+         * the scheduledCompleterContext, since *this has a shared pointer to it.
+         * but not session or msg!
          */
+        session = 0; msg = 0;
         QPID_LOG(debug, ": async completion callback scheduled for msg seq=" << id);
-        if (session->scheduledCompleterContext->scheduleCompletion(id))
-            session->getConnection().requestIOProcessing(boost::bind(&scheduledCompleter,
-                                                                     session->scheduledCompleterContext));
-    } else {  // command is being completed in IO thread.
-        // this path runs only on the IO thread.
-        qpid::sys::ScopedLock<Mutex> l(session->incompleteCmdsLock);
-        std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> >::iterator cmd;
-        cmd = session->incompleteCmds.find(id);
-        if (cmd != session->incompleteCmds.end()) {
-            boost::shared_ptr<IncompleteCommandContext> tmp(cmd->second);
-            session->incompleteCmds.erase(cmd);
-
-            if (session->isAttached()) {
-                QPID_LOG(debug, ": receive completed for msg seq=" << id);
-                qpid::sys::ScopedUnlock<Mutex> ul(session->incompleteCmdsLock);
-                session->completeRcvMsg(id, requiresAccept, requiresSync);
-                return;
-            }
+        completerContext->scheduleMsgCompletion(id, requiresAccept, requiresSync);
+    } else {
+        // this path runs directly from the ac->end() call in handleContent() above,
+        // so *session and *msg are definately valid.
+        if (session->isAttached()) {
+            QPID_LOG(debug, ": receive completed for msg seq=" << id);
+            session->completeRcvMsg(id, requiresAccept, requiresSync);
         }
     }
+    completerContext = boost::intrusive_ptr<AsyncCommandCompleter>();
 }
 
 
-/** Scheduled from incomplete command's completed callback, safely completes all
- * completed commands in the IO Thread.  Guaranteed not to be running at the same
- * time as the message receive code.
+/** Scheduled from an asynchronous command's completed callback to run on
+ * the IO thread.
  */
-void SessionState::scheduledCompleter(boost::shared_ptr<SessionState::ScheduledCompleterContext> ctxt)
+void SessionState::AsyncCommandCompleter::schedule(boost::intrusive_ptr<AsyncCommandCompleter> ctxt)
 {
     ctxt->completeCommands();
 }
 
 
-/** mark a command (sequence) as completed, return True if caller should
- * schedule a call to completeCommands()
+/** mark an ingress Message.Transfer command as completed.
+ * This method must be thread safe - it may run on any thread.
  */
-bool SessionState::ScheduledCompleterContext::scheduleCompletion(SequenceNumber cmd)
-{
-    qpid::sys::ScopedLock<qpid::sys::Mutex> l(completedCmdsLock);
-
-    completedCmds.push_back(cmd);
-    return (completedCmds.size() == 1);
+void SessionState::AsyncCommandCompleter::scheduleMsgCompletion(SequenceNumber cmd,
+                                                                bool requiresAccept,
+                                                                bool requiresSync)
+{
+    qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
+
+    if (session) {
+        MessageInfo msg(cmd, requiresAccept, requiresSync);
+        completedMsgs.push_back(msg);
+        if (completedMsgs.size() == 1) {
+            session->getConnection().requestIOProcessing(boost::bind(&schedule,
+                                                                     session->asyncCommandCompleter));
+        }
+    }
 }
 
 
-/** Cause the session to complete all completed commands */
-void SessionState::ScheduledCompleterContext::completeCommands()
+/** Cause the session to complete all completed commands.
+ * Executes on the IO thread.
+ */
+void SessionState::AsyncCommandCompleter::completeCommands()
 {
-    qpid::sys::ScopedLock<qpid::sys::Mutex> l(completedCmdsLock);
+    qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
 
     // when session is destroyed, it clears the session pointer via cancel().
-    if (!session) return;
-
-    while (!completedCmds.empty()) {
-        SequenceNumber id = completedCmds.front();
-        completedCmds.pop_front();
-        std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> >::iterator cmd;
-        {
-            qpid::sys::ScopedLock<qpid::sys::Mutex> l(session->incompleteCmdsLock);
-
-            cmd = session->incompleteCmds.find(id);
-            if (cmd !=session->incompleteCmds.end()) {
-                boost::shared_ptr<IncompleteCommandContext> tmp(cmd->second);
-                {
-                    qpid::sys::ScopedUnlock<qpid::sys::Mutex> ul(session->incompleteCmdsLock);
-                    tmp->do_completion();   // retakes incompleteCmdslock
-                }
-            }
+    if (session && session->isAttached()) {
+        for (std::vector<MessageInfo>::iterator msg = completedMsgs.begin();
+             msg != completedMsgs.end(); ++msg) {
+            session->completeRcvMsg(msg->cmd, msg->requiresAccept, msg->requiresSync);
         }
     }
+    completedMsgs.clear();
 }
 
 
 /** cancel any pending calls to scheduleComplete */
-void SessionState::ScheduledCompleterContext::cancel()
+void SessionState::AsyncCommandCompleter::cancel()
 {
-    qpid::sys::ScopedLock<qpid::sys::Mutex> l(completedCmdsLock);
+    qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
     session = 0;
 }
 

Modified: qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/SessionState.h?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/SessionState.h Tue Mar 15 01:54:07 2011
@@ -38,6 +38,7 @@
 
 #include <boost/noncopyable.hpp>
 #include <boost/scoped_ptr.hpp>
+#include <boost/intrusive_ptr.hpp>
 
 #include <set>
 #include <vector>
@@ -176,79 +177,84 @@ class SessionState : public qpid::Sessio
     std::queue<SequenceNumber> pendingExecutionSyncs;
     bool currentCommandComplete;
 
-    /** Abstract class that represents a command that is pending
-     * completion.
+    /** This class provides a context for completing asynchronous commands in a thread
+     * safe manner.  Asynchronous commands save their completion state in this class.
+     * This class then schedules the completeCommands() method in the IO thread.
+     * While running in the IO thread, completeCommands() may safely complete all
+     * saved commands without the risk of colliding with other operations on this
+     * SessionState.
      */
-    class IncompleteCommandContext : public AsyncCompletion
+    class AsyncCommandCompleter : public RefCounted {
+    private:
+        SessionState *session;
+        qpid::sys::Mutex completerLock;
+
+        // special-case message.transfer commands for optimization
+        struct MessageInfo {
+            SequenceNumber cmd; // message.transfer command id
+            bool requiresAccept;
+            bool requiresSync;
+        MessageInfo(SequenceNumber c, bool a, bool s)
+        : cmd(c), requiresAccept(a), requiresSync(s) {}
+        };
+        std::vector<MessageInfo> completedMsgs;
+
+        /** complete all pending commands, runs in IO thread */
+        void completeCommands();
+
+        /** for scheduling a run of "completeCommands()" on the IO thread */
+        static void schedule(boost::intrusive_ptr<AsyncCommandCompleter>);
+
+  public:
+        AsyncCommandCompleter(SessionState *s) : session(s) {};
+        ~AsyncCommandCompleter() {};
+
+        /** schedule the completion of an ingress message.transfer command */
+        void scheduleMsgCompletion(SequenceNumber cmd,
+                                   bool requiresAccept,
+                                   bool requiresSync);
+        void cancel();  // called by SessionState destructor.
+    };
+    boost::intrusive_ptr<AsyncCommandCompleter> asyncCommandCompleter;
+
+    /** Abstract class that represents a single asynchronous command that is
+     * pending completion.
+     */
+    class AsyncCommandContext : public AsyncCompletion::Callback
     {
      public:
-        IncompleteCommandContext( SessionState *ss, SequenceNumber _id )
-          : id(_id), session(ss) {}
-        virtual ~IncompleteCommandContext() {}
-
-        /* allows manual invokation of completion, used by IO thread to
-         * complete a command that was originally finished on a different
-         * thread.
-         */
-        void do_completion() { completed(true); }
+        AsyncCommandContext( SessionState *ss, SequenceNumber _id )
+          : id(_id), completerContext(ss->asyncCommandCompleter) {}
+        virtual ~AsyncCommandContext() {}
 
      protected:
         SequenceNumber id;
-        SessionState    *session;
+        boost::intrusive_ptr<AsyncCommandCompleter> completerContext;
     };
 
     /** incomplete Message.transfer commands - inbound to broker from client
      */
-    class IncompleteIngressMsgXfer : public SessionState::IncompleteCommandContext
+    class IncompleteIngressMsgXfer : public SessionState::AsyncCommandContext
     {
      public:
         IncompleteIngressMsgXfer( SessionState *ss,
-                                  SequenceNumber _id,
-                                  boost::intrusive_ptr<Message> msg )
-          : IncompleteCommandContext(ss, _id),
-          requiresAccept(msg->requiresAccept()),
-          requiresSync(msg->getFrames().getMethod()->isSync()) {};
+                                  boost::intrusive_ptr<Message> m )
+          : AsyncCommandContext(ss, m->getCommandId()),
+            session(ss),
+            msg(m.get()),
+            requiresAccept(msg->requiresAccept()),
+            requiresSync(msg->getFrames().getMethod()->isSync()) {};
         virtual ~IncompleteIngressMsgXfer() {};
 
-     protected:
         virtual void completed(bool);
+        virtual boost::intrusive_ptr<AsyncCompletion::Callback> clone();
 
      private:
-        /** meta-info required to complete the message */
+        SessionState *session;  // only valid if sync == true
+        Message *msg;           // only valid if sync == true
         bool requiresAccept;
-        bool requiresSync;  // method's isSync() flag
+        bool requiresSync;
     };
-    /** creates a command context suitable for use as an AsyncCompletion in a message */
-    boost::shared_ptr<SessionState::IncompleteIngressMsgXfer> createIngressMsgXferContext( boost::intrusive_ptr<Message> msg);
-
-    /* A list of commands that are pending completion.  These commands are
-     * awaiting some set of asynchronous operations to finish (eg: store,
-     * flow-control, etc). before the command can be completed to the client
-     */
-    std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> > incompleteCmds;
-    qpid::sys::Mutex incompleteCmdsLock;  // locks above container
-
-    /** This context is shared between the SessionState and scheduledCompleter,
-     * holds the sequence numbers of all commands that have completed asynchronously.
-     */
-    class ScheduledCompleterContext {
-    private:
-        std::list<SequenceNumber> completedCmds;
-        // ordering: take this lock first, then incompleteCmdsLock
-        qpid::sys::Mutex completedCmdsLock;
-        SessionState *session;
-    public:
-        ScheduledCompleterContext(SessionState *s) : session(s) {};
-        bool scheduleCompletion(SequenceNumber cmd);
-        void completeCommands();
-        void cancel();
-    };
-    boost::shared_ptr<ScheduledCompleterContext> scheduledCompleterContext;
-
-    /** The following method runs the in IO thread and completes commands that
-     * where finished asynchronously.
-     */
-    static void scheduledCompleter(boost::shared_ptr<ScheduledCompleterContext>);
 
     friend class SessionManager;
 };

Modified: qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/ThresholdAlerts.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/ThresholdAlerts.cpp?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/ThresholdAlerts.cpp (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/ThresholdAlerts.cpp Tue Mar 15 01:54:07 2011
@@ -28,6 +28,52 @@
 
 namespace qpid {
 namespace broker {
+namespace {
+const qmf::org::apache::qpid::broker::EventQueueThresholdExceeded EVENT("dummy", 0, 0);
+bool isQMFv2(const boost::intrusive_ptr<Message> message)
+{
+    const qpid::framing::MessageProperties* props = message->getProperties<qpid::framing::MessageProperties>();
+    return props && props->getAppId() == "qmf2";
+}
+
+bool isThresholdEvent(const boost::intrusive_ptr<Message> message)
+{
+    if (message->getIsManagementMessage()) {
+        //is this a qmf event? if so is it a threshold event?
+        if (isQMFv2(message)) {
+            const qpid::framing::FieldTable* headers = message->getApplicationHeaders();
+            if (headers && headers->getAsString("qmf.content") == "_event") {
+                //decode as list
+                std::string content = message->getFrames().getContent();
+                qpid::types::Variant::List list;
+                qpid::amqp_0_10::ListCodec::decode(content, list);
+                if (list.empty() || list.front().getType() != qpid::types::VAR_MAP) return false;
+                qpid::types::Variant::Map map = list.front().asMap();
+                try {
+                    std::string eventName = map["_schema_id"].asMap()["_class_name"].asString();
+                    return eventName == EVENT.getEventName();
+                } catch (const std::exception& e) {
+                    QPID_LOG(error, "Error checking for recursive threshold alert: " << e.what());
+                }
+            }
+        } else {
+            std::string content = message->getFrames().getContent();
+            qpid::framing::Buffer buffer(const_cast<char*>(content.data()), content.size());
+            if (buffer.getOctet() == 'A' && buffer.getOctet() == 'M' && buffer.getOctet() == '2' && buffer.getOctet() == 'e') {
+                buffer.getLong();//sequence
+                std::string packageName;
+                buffer.getShortString(packageName);
+                if (packageName != EVENT.getPackageName()) return false;
+                std::string eventName;
+                buffer.getShortString(eventName);
+                return eventName == EVENT.getEventName();
+            }
+        }
+    }
+    return false;
+}
+}
+
 ThresholdAlerts::ThresholdAlerts(const std::string& n,
                                  qpid::management::ManagementAgent& a,
                                  const uint32_t ct,
@@ -44,8 +90,13 @@ void ThresholdAlerts::enqueued(const Que
     if ((countThreshold && count >= countThreshold) || (sizeThreshold && size >= sizeThreshold)) {
         if ((repeatInterval == 0 && lastAlert == qpid::sys::EPOCH)
             || qpid::sys::Duration(lastAlert, qpid::sys::now()) > repeatInterval) {
-            agent.raiseEvent(qmf::org::apache::qpid::broker::EventQueueThresholdExceeded(name, count, size));
+            //Note: Raising an event may result in messages being
+            //enqueued on queues; it may even be that this event
+            //causes a message to be enqueued on the queue we are
+            //tracking, and so we need to avoid recursing
+            if (isThresholdEvent(m.payload)) return;
             lastAlert = qpid::sys::now();
+            agent.raiseEvent(qmf::org::apache::qpid::broker::EventQueueThresholdExceeded(name, count, size));
         }
     }
 }
@@ -75,12 +126,12 @@ void ThresholdAlerts::observe(Queue& que
 }
 
 void ThresholdAlerts::observe(Queue& queue, qpid::management::ManagementAgent& agent,
-                              const qpid::framing::FieldTable& settings)
+                              const qpid::framing::FieldTable& settings, uint16_t limitRatio)
 
 {
     qpid::types::Variant::Map map;
     qpid::amqp_0_10::translate(settings, map);
-    observe(queue, agent, map);
+    observe(queue, agent, map, limitRatio);
 }
 
 template <class T>
@@ -118,19 +169,19 @@ class Option
 };
 
 void ThresholdAlerts::observe(Queue& queue, qpid::management::ManagementAgent& agent,
-                              const qpid::types::Variant::Map& settings)
+                              const qpid::types::Variant::Map& settings, uint16_t limitRatio)
 
 {
     //Note: aliases are keys defined by java broker
     Option<int64_t> repeatInterval("qpid.alert_repeat_gap", 60);
     repeatInterval.addAlias("x-qpid-minimum-alert-repeat-gap");
 
-    //If no explicit threshold settings were given use 80% of any
-    //limit from the policy.
+    //If no explicit threshold settings were given use specified
+    //percentage of any limit from the policy.
     const QueuePolicy* policy = queue.getPolicy();
-    Option<uint32_t> countThreshold("qpid.alert_count", (uint32_t) (policy ? policy->getMaxCount()*0.8 : 0));
+    Option<uint32_t> countThreshold("qpid.alert_count", (uint32_t) (policy && limitRatio ? (policy->getMaxCount()*limitRatio/100) : 0));
     countThreshold.addAlias("x-qpid-maximum-message-count");
-    Option<uint64_t> sizeThreshold("qpid.alert_size", (uint64_t) (policy ? policy->getMaxSize()*0.8 : 0));
+    Option<uint64_t> sizeThreshold("qpid.alert_size", (uint64_t) (policy && limitRatio ? (policy->getMaxSize()*limitRatio/100) : 0));
     sizeThreshold.addAlias("x-qpid-maximum-message-size");
 
     observe(queue, agent, countThreshold.get(settings), sizeThreshold.get(settings), repeatInterval.get(settings));

Modified: qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/ThresholdAlerts.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/ThresholdAlerts.h?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/ThresholdAlerts.h (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/ThresholdAlerts.h Tue Mar 15 01:54:07 2011
@@ -55,9 +55,9 @@ class ThresholdAlerts : public QueueObse
                         const uint64_t sizeThreshold,
                         const long repeatInterval);
     static void observe(Queue& queue, qpid::management::ManagementAgent& agent,
-                        const qpid::framing::FieldTable& settings);
+                        const qpid::framing::FieldTable& settings, uint16_t limitRatio);
     static void observe(Queue& queue, qpid::management::ManagementAgent& agent,
-                        const qpid::types::Variant::Map& settings);
+                        const qpid::types::Variant::Map& settings, uint16_t limitRatio);
   private:
     const std::string name;
     qpid::management::ManagementAgent& agent;

Modified: qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/TopicExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/TopicExchange.cpp?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/TopicExchange.cpp (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/TopicExchange.cpp Tue Mar 15 01:54:07 2011
@@ -221,6 +221,7 @@ TopicExchange::TopicExchange(const std::
 
 bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args)
 {
+	ClearCache cc(&cacheLock,&bindingCache); // clear the cache on function exit.
     string fedOp(args ? args->getAsString(qpidFedOp) : fedOpBind);
     string fedTags(args ? args->getAsString(qpidFedTags) : "");
     string fedOrigin(args ? args->getAsString(qpidFedOrigin) : "");
@@ -288,6 +289,7 @@ bool TopicExchange::bind(Queue::shared_p
 }
 
 bool TopicExchange::unbind(Queue::shared_ptr queue, const string& constRoutingKey, const FieldTable* /*args*/){
+	ClearCache cc(&cacheLock,&bindingCache); // clear the cache on function exit.
     RWlock::ScopedWlock l(lock);
     string routingKey = normalize(constRoutingKey);
     BindingKey* bk = bindingTree.getBindingKey(routingKey);
@@ -333,13 +335,24 @@ bool TopicExchange::isBound(Queue::share
 void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/)
 {
     // Note: PERFORMANCE CRITICAL!!!
-    BindingList b(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >);
+    BindingList b;
+	std::map<std::string, BindingList>::iterator it;
+	{  // only lock the cache for read
+       RWlock::ScopedRlock cl(cacheLock);
+	   it = bindingCache.find(routingKey);
+	}
     PreRoute pr(msg, this);
-    BindingsFinderIter bindingsFinder(b);
+    if (it == bindingCache.end())  // no cache hit
     {
         RWlock::ScopedRlock l(lock);
+    	b = BindingList(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >);
+        BindingsFinderIter bindingsFinder(b);
         bindingTree.iterateMatch(routingKey, bindingsFinder);
-    }
+	    RWlock::ScopedWlock cwl(cacheLock);
+		bindingCache[routingKey] = b; // update cache
+    }else {
+        b = it->second;
+     }
     doRoute(msg, b);
 }
 

Modified: qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/TopicExchange.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/TopicExchange.h?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/TopicExchange.h (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/TopicExchange.h Tue Mar 15 01:54:07 2011
@@ -135,7 +135,19 @@ class TopicExchange : public virtual Exc
     BindingNode bindingTree;
     unsigned long nBindings;
     qpid::sys::RWlock lock;     // protects bindingTree and nBindings
-
+    qpid::sys::RWlock cacheLock;     // protects cache
+	std::map<std::string, BindingList> bindingCache; // cache of matched routes.
+	class ClearCache {
+	private:
+		qpid::sys::RWlock* cacheLock;
+		std::map<std::string, BindingList>* bindingCache; 
+	public:
+		ClearCache(qpid::sys::RWlock* l, std::map<std::string, BindingList>* bc): cacheLock(l),bindingCache(bc) {};
+		~ClearCache(){ 
+			qpid::sys::RWlock::ScopedWlock l(*cacheLock);
+			bindingCache->clear();   
+		};
+	};
     bool isBound(Queue::shared_ptr queue, const std::string& pattern);
 
     class ReOriginIter;

Modified: qpid/branches/qpid-2920/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/client/ConnectionHandler.cpp?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/client/ConnectionHandler.cpp (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/client/ConnectionHandler.cpp Tue Mar 15 01:54:07 2011
@@ -22,6 +22,7 @@
 #include "qpid/client/ConnectionHandler.h"
 
 #include "qpid/SaslFactory.h"
+#include "qpid/StringUtils.h"
 #include "qpid/client/Bounds.h"
 #include "qpid/framing/amqp_framing.h"
 #include "qpid/framing/all_method_bodies.h"
@@ -202,6 +203,24 @@ void ConnectionHandler::fail(const std::
 
 namespace {
 std::string SPACE(" ");
+
+std::string join(const std::vector<std::string>& in)
+{
+    std::string result;
+    for (std::vector<std::string>::const_iterator i = in.begin(); i != in.end(); ++i) {
+        if (result.size()) result += SPACE;
+        result += *i;
+    }
+    return result;
+}
+
+void intersection(const std::vector<std::string>& a, const std::vector<std::string>& b, std::vector<std::string>& results)
+{
+    for (std::vector<std::string>::const_iterator i = a.begin(); i != a.end(); ++i) {
+        if (std::find(b.begin(), b.end(), *i) != b.end())  results.push_back(*i);
+    }
+}
+
 }
 
 void ConnectionHandler::start(const FieldTable& /*serverProps*/, const Array& mechanisms, const Array& /*locales*/)
@@ -216,25 +235,24 @@ void ConnectionHandler::start(const Fiel
                                               maxSsf
                                             );
 
-    std::string mechlist;
-    bool chosenMechanismSupported = mechanism.empty();
-    for (Array::const_iterator i = mechanisms.begin(); i != mechanisms.end(); ++i) {
-        if (!mechanism.empty() && mechanism == (*i)->get<std::string>()) {
-            chosenMechanismSupported = true;
-            mechlist = (*i)->get<std::string>() + SPACE + mechlist;
-        } else {
-            if (i != mechanisms.begin()) mechlist += SPACE;
-            mechlist += (*i)->get<std::string>();
+    std::vector<std::string> mechlist;
+    if (mechanism.empty()) {
+        //mechlist is simply what the server offers
+        mechanisms.collect(mechlist);
+    } else {
+        //mechlist is the intersection of those indicated by user and
+        //those supported by server, in the order listed by user
+        std::vector<std::string> allowed = split(mechanism, " ");
+        std::vector<std::string> supported;
+        mechanisms.collect(supported);
+        intersection(allowed, supported, mechlist);
+        if (mechlist.empty()) {
+            throw Exception(QPID_MSG("Desired mechanism(s) not valid: " << mechanism << " (supported: " << join(supported) << ")"));
         }
     }
 
-    if (!chosenMechanismSupported) {
-        fail("Selected mechanism not supported: " + mechanism);
-    }
-
     if (sasl.get()) {
-        string response = sasl->start(mechanism.empty() ? mechlist : mechanism,
-                                      getSecuritySettings ? getSecuritySettings() : 0);
+        string response = sasl->start(join(mechlist), getSecuritySettings ? getSecuritySettings() : 0);
         proxy.startOk(properties, sasl->getMechanism(), response, locale);
     } else {
         //TODO: verify that desired mechanism and locale are supported

Modified: qpid/branches/qpid-2920/qpid/cpp/src/qpid/client/TCPConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/client/TCPConnector.cpp?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/client/TCPConnector.cpp (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/client/TCPConnector.cpp Tue Mar 15 01:54:07 2011
@@ -117,7 +117,7 @@ void TCPConnector::connected(const Socke
 
 void TCPConnector::start(sys::AsynchIO* aio_) {
     aio = aio_;
-    for (int i = 0; i < 32; i++) {
+    for (int i = 0; i < 4; i++) {
         aio->queueReadBuffer(new Buff(maxFrameSize));
     }
 

Modified: qpid/branches/qpid-2920/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp Tue Mar 15 01:54:07 2011
@@ -39,6 +39,7 @@ using qpid::types::Variant;
 using qpid::types::VAR_LIST;
 using qpid::framing::Uuid;
 
+namespace {
 void convert(const Variant::List& from, std::vector<std::string>& to)
 {
     for (Variant::List::const_iterator i = from.begin(); i != from.end(); ++i) {
@@ -46,19 +47,6 @@ void convert(const Variant::List& from, 
     }
 }
 
-template <class T> bool setIfFound(const Variant::Map& map, const std::string& key, T& value)
-{
-    Variant::Map::const_iterator i = map.find(key);
-    if (i != map.end()) {
-        value = (T) i->second;
-        QPID_LOG(debug, "option " << key << " specified as " << i->second);
-        return true;
-    } else {
-        return false;
-    }
-}
-
-namespace {
 std::string asString(const std::vector<std::string>& v) {
     std::stringstream os;
     os << "[";
@@ -71,47 +59,6 @@ std::string asString(const std::vector<s
 }
 }
 
-template <> bool setIfFound< std::vector<std::string> >(const Variant::Map& map,
-                                            const std::string& key,
-                                            std::vector<std::string>& value)
-{
-    Variant::Map::const_iterator i = map.find(key);
-    if (i != map.end()) {
-        value.clear();
-        if (i->second.getType() == VAR_LIST) {
-            convert(i->second.asList(), value);
-        } else {
-            value.push_back(i->second.asString());
-        }
-        QPID_LOG(debug, "option " << key << " specified as " << asString(value));
-        return true;
-    } else {
-        return false;
-    }
-}
-
-void convert(const Variant::Map& from, ConnectionSettings& to)
-{
-    setIfFound(from, "username", to.username);
-    setIfFound(from, "password", to.password);
-    setIfFound(from, "sasl-mechanism", to.mechanism);
-    setIfFound(from, "sasl-service", to.service);
-    setIfFound(from, "sasl-min-ssf", to.minSsf);
-    setIfFound(from, "sasl-max-ssf", to.maxSsf);
-
-    setIfFound(from, "heartbeat", to.heartbeat);
-    setIfFound(from, "tcp-nodelay", to.tcpNoDelay);
-
-    setIfFound(from, "locale", to.locale);
-    setIfFound(from, "max-channels", to.maxChannels);
-    setIfFound(from, "max-frame-size", to.maxFrameSize);
-    setIfFound(from, "bounds", to.bounds);
-
-    setIfFound(from, "transport", to.protocol);
-
-    setIfFound(from, "ssl-cert-name", to.sslCertName);
-}
-
 ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& options) :
     reconnect(false), timeout(-1), limit(-1),
     minReconnectInterval(3), maxReconnectInterval(60),
@@ -124,27 +71,64 @@ ConnectionImpl::ConnectionImpl(const std
 
 void ConnectionImpl::setOptions(const Variant::Map& options)
 {
-    sys::Mutex::ScopedLock l(lock);
-    convert(options, settings);
-    setIfFound(options, "reconnect", reconnect);
-    setIfFound(options, "reconnect-timeout", timeout);
-    setIfFound(options, "reconnect-limit", limit);
-    int64_t reconnectInterval;
-    if (setIfFound(options, "reconnect-interval", reconnectInterval)) {
-        minReconnectInterval = maxReconnectInterval = reconnectInterval;
-    } else {
-        setIfFound(options, "reconnect-interval-min", minReconnectInterval);
-        setIfFound(options, "reconnect-interval-max", maxReconnectInterval);
+    for (Variant::Map::const_iterator i = options.begin(); i != options.end(); ++i) {
+        setOption(i->first, i->second);
     }
-    setIfFound(options, "reconnect-urls", urls);
-    setIfFound(options, "x-reconnect-on-limit-exceeded", reconnectOnLimitExceeded);
 }
 
 void ConnectionImpl::setOption(const std::string& name, const Variant& value)
 {
-    Variant::Map options;
-    options[name] = value;
-    setOptions(options);
+    sys::Mutex::ScopedLock l(lock);
+    if (name == "reconnect") {
+        reconnect = value;
+    } else if (name == "reconnect-timeout" || name == "reconnect_timeout") {
+        timeout = value;
+    } else if (name == "reconnect-limit" || name == "reconnect_limit") {
+        limit = value;
+    } else if (name == "reconnect-interval" || name == "reconnect_interval") {
+        maxReconnectInterval = minReconnectInterval = value;
+    } else if (name == "reconnect-interval-min" || name == "reconnect_interval_min") {
+        minReconnectInterval = value;
+    } else if (name == "reconnect-interval-max" || name == "reconnect_interval_max") {
+        maxReconnectInterval = value;
+    } else if (name == "reconnect-urls" || name == "reconnect_urls") {
+        if (value.getType() == VAR_LIST) {
+            convert(value.asList(), urls);
+        } else {
+            urls.push_back(value.asString());
+        }
+    } else if (name == "username") {
+        settings.username = value.asString();
+    } else if (name == "password") {
+        settings.password = value.asString();
+    } else if (name == "sasl-mechanism" || name == "sasl_mechanism" ||
+               name == "sasl-mechanisms" || name == "sasl_mechanisms") {
+        settings.mechanism = value.asString();
+    } else if (name == "sasl-service" || name == "sasl_service") {
+        settings.service = value.asString();
+    } else if (name == "sasl-min-ssf" || name == "sasl_min_ssf") {
+        settings.minSsf = value;
+    } else if (name == "sasl-max-ssf" || name == "sasl_max_ssf") {
+        settings.maxSsf = value;
+    } else if (name == "heartbeat") {
+        settings.heartbeat = value;
+    } else if (name == "tcp-nodelay" || name == "tcp_nodelay") {
+        settings.tcpNoDelay = value;
+    } else if (name == "locale") {
+        settings.locale = value.asString();
+    } else if (name == "max-channels" || name == "max_channels") {
+        settings.maxChannels = value;
+    } else if (name == "max-frame-size" || name == "max_frame_size") {
+        settings.maxFrameSize = value;
+    } else if (name == "bounds") {
+        settings.bounds = value;
+    } else if (name == "transport") {
+        settings.protocol = value.asString();
+    } else if (name == "ssl-cert-name" || name == "ssl_cert_name") {
+        settings.sslCertName = value.asString();
+    } else {
+        throw qpid::messaging::MessagingException(QPID_MSG("Invalid option: " << name << " not recognised"));
+    }
 }
 
 

Modified: qpid/branches/qpid-2920/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp Tue Mar 15 01:54:07 2011
@@ -59,7 +59,9 @@ void OutgoingMessage::convert(const qpid
         message.getMessageProperties().setReplyTo(AddressResolution::convert(address));
     }
     translate(from.getProperties(), message.getMessageProperties().getApplicationHeaders());
-    message.getDeliveryProperties().setTtl(from.getTtl().getMilliseconds());
+    if (from.getTtl().getMilliseconds()) {
+        message.getDeliveryProperties().setTtl(from.getTtl().getMilliseconds());
+    }
     if (from.getDurable()) {
         message.getDeliveryProperties().setDeliveryMode(DELIVERY_MODE_PERSISTENT);
     }

Modified: qpid/branches/qpid-2920/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp Tue Mar 15 01:54:07 2011
@@ -135,6 +135,7 @@ void SenderImpl::sendUnreliable(const qp
 void SenderImpl::replay(const sys::Mutex::ScopedLock&)
 {
     for (OutgoingMessages::iterator i = outgoing.begin(); i != outgoing.end(); ++i) {
+        i->message.setRedelivered(true);
         sink->send(session, name, *i);
     }
 }
@@ -147,7 +148,7 @@ uint32_t SenderImpl::checkPendingSends(b
 uint32_t SenderImpl::checkPendingSends(bool flush, const sys::Mutex::ScopedLock&)
 {
     if (flush) {
-        session.flush(); 
+        session.flush();
         flushed = true;
     } else {
         flushed = false;

Modified: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/Cluster.cpp Tue Mar 15 01:54:07 2011
@@ -943,6 +943,10 @@ void Cluster::checkUpdateIn(Lock& l) {
             mAgent->suppress(false); // Enable management output.
             mAgent->clusterUpdate();
         }
+        // Restore alternate exchange settings on exchanges.
+        broker.getExchanges().eachExchange(
+            boost::bind(&broker::Exchange::recoveryComplete, _1,
+                        boost::ref(broker.getExchanges())));
         enableClusterSafe();    // Enable cluster-safe assertions
         deliverEventQueue.start();
     }

Modified: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/Connection.cpp Tue Mar 15 01:54:07 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -143,7 +143,7 @@ void Connection::init() {
 // Called when we have consumed a read buffer to give credit to the
 // connection layer to continue reading.
 void Connection::giveReadCredit(int credit) {
-    if (cluster.getSettings().readMax && credit) 
+    if (cluster.getSettings().readMax && credit)
         output.giveReadCredit(credit);
 }
 
@@ -201,7 +201,7 @@ void Connection::received(framing::AMQFr
     }
     else {             // Shadow or updated catch-up connection.
         if (f.getMethod() && f.getMethod()->isA<ConnectionCloseBody>()) {
-            if (isShadow()) 
+            if (isShadow())
                 cluster.addShadowConnection(this);
             AMQFrame ok((ConnectionCloseOkBody()));
             connection->getOutput().send(ok);
@@ -241,7 +241,7 @@ void Connection::deliverDoOutput(uint32_
 void Connection::deliveredFrame(const EventFrame& f) {
     GiveReadCreditOnExit gc(*this, f.readCredit);
     assert(!catchUp);
-    currentChannel = f.frame.getChannel(); 
+    currentChannel = f.frame.getChannel();
     if (f.frame.getBody()       // frame can be emtpy with just readCredit
         && !framing::invoke(*this, *f.frame.getBody()).wasHandled() // Connection contol.
         && !checkUnsupported(*f.frame.getBody())) // Unsupported operation.
@@ -287,7 +287,7 @@ void Connection::deliverClose () {
     cluster.erase(self);
 }
 
-// Close the connection 
+// Close the connection
 void Connection::close() {
     if (connection.get()) {
         QPID_LOG(debug, cluster << " closed connection " << *this);
@@ -332,9 +332,9 @@ size_t Connection::decode(const char* da
         if (!checkProtocolHeader(ptr, size)) // Updates ptr
             return 0; // Incomplete header
 
-        if (!connection->isOpen()) 
+        if (!connection->isOpen())
             processInitialFrames(ptr, end-ptr); // Updates ptr
-        
+
         if (connection->isOpen() && end - ptr > 0) {
             // We're multi-casting, we will give read credit on delivery.
             grc.credit = 0;
@@ -432,7 +432,7 @@ void Connection::sessionState(
         unknownCompleted,
         receivedIncomplete);
     QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId());
-    // The output tasks will be added later in the update process. 
+    // The output tasks will be added later in the update process.
     connection->getOutputTasks().removeAll();
 }
 
@@ -478,7 +478,7 @@ void Connection::retractOffer() {
 
 void Connection::closeUpdated() {
     self.second = 0;      // Mark this as completed update connection.
-    if (connection.get()) 
+    if (connection.get())
         connection->close(connection::CLOSE_CODE_NORMAL, "OK");
 }
 
@@ -529,7 +529,7 @@ void Connection::deliveryRecord(const st
             m = getUpdateMessage();
             m.queue = queue.get();
             m.position = position;
-            if (enqueued) queue->updateEnqueued(m); //inform queue of the message 
+            if (enqueued) queue->updateEnqueued(m); //inform queue of the message
         } else {                // Message at original position in original queue
             m = queue->find(position);
         }
@@ -591,7 +591,7 @@ void Connection::txEnqueue(const std::st
 
 void Connection::txPublish(const framing::Array& queues, bool delivered) {
     boost::shared_ptr<broker::TxPublish> txPub(new broker::TxPublish(getUpdateMessage().payload));
-    for (framing::Array::const_iterator i = queues.begin(); i != queues.end(); ++i) 
+    for (framing::Array::const_iterator i = queues.begin(); i != queues.end(); ++i)
         txPub->deliverTo(findQueue((*i)->get<std::string>()));
     txPub->delivered = delivered;
     txBuffer->enlist(txPub);
@@ -614,12 +614,6 @@ void Connection::exchange(const std::str
     QPID_LOG(debug, cluster << " updated exchange " << ex->getName());
 }
 
-void Connection::queue(const std::string& encoded) {
-    Buffer buf(const_cast<char*>(encoded.data()), encoded.size());
-    broker::Queue::shared_ptr q = broker::Queue::decode(cluster.getBroker().getQueues(), buf);
-    QPID_LOG(debug, cluster << " updated queue " << q->getName());
-}
-
 void Connection::sessionError(uint16_t , const std::string& msg) {
     // Ignore errors before isOpen(), we're not multicasting yet.
     if (connection->isOpen())
@@ -678,6 +672,12 @@ void Connection::config(const std::strin
     else throw Exception(QPID_MSG("Update failed, invalid kind of config: " << kind));
 }
 
+void Connection::doCatchupIoCallbacks() {
+    // We need to process IO callbacks during the catch-up phase in
+    // order to service asynchronous completions for messages
+    // transferred during catch-up.
 
+    if (catchUp) getBrokerConnection()->doIoCallbacks();
+}
 }} // Namespace qpid::cluster
 

Modified: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/Connection.h?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/Connection.h Tue Mar 15 01:54:07 2011
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -62,7 +62,7 @@ class Connection :
         public sys::ConnectionInputHandler,
         public framing::AMQP_AllOperations::ClusterConnectionHandler,
         private broker::Connection::ErrorListener
-        
+
 {
   public:
 
@@ -73,7 +73,7 @@ class Connection :
     Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& mgmtId, const ConnectionId& id,
                const qpid::sys::SecuritySettings& external);
     ~Connection();
-    
+
     ConnectionId getId() const { return self; }
     broker::Connection* getBrokerConnection() { return connection.get(); }
     const broker::Connection* getBrokerConnection() const { return connection.get(); }
@@ -108,9 +108,9 @@ class Connection :
     void deliveredFrame(const EventFrame&);
 
     void consumerState(const std::string& name, bool blocked, bool notifyEnabled, const qpid::framing::SequenceNumber& position);
-    
+
     // ==== Used in catch-up mode to build initial state.
-    // 
+    //
     // State update methods.
     void shadowPrepare(const std::string&);
 
@@ -123,9 +123,9 @@ class Connection :
                       const framing::SequenceNumber& received,
                       const framing::SequenceSet& unknownCompleted,
                       const SequenceSet& receivedIncomplete);
-    
+
     void outputTask(uint16_t channel, const std::string& name);
-    
+
     void shadowReady(uint64_t memberId,
                      uint64_t connectionId,
                      const std::string& managementId,
@@ -163,8 +163,7 @@ class Connection :
     void txEnd();
     void accumulatedAck(const framing::SequenceSet&);
 
-    // Encoded queue/exchange replication.
-    void queue(const std::string& encoded);
+    // Encoded exchange replication.
     void exchange(const std::string& encoded);
 
     void giveReadCredit(int credit);
@@ -189,6 +188,8 @@ class Connection :
 
     void setSecureConnection ( broker::SecureConnection * sc );
 
+    void doCatchupIoCallbacks();
+
   private:
     struct NullFrameHandler : public framing::FrameHandler {
         void handle(framing::AMQFrame&) {}
@@ -233,7 +234,7 @@ class Connection :
     // Error listener functions
     void connectionError(const std::string&);
     void sessionError(uint16_t channel, const std::string&);
-    
+
     void init();
     bool checkUnsupported(const framing::AMQBody& body);
     void deliverDoOutput(uint32_t limit);

Modified: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp Tue Mar 15 01:54:07 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -66,11 +66,14 @@ void OutputInterceptor::giveReadCredit(i
 }
 
 // Called in write thread when the IO layer has no more data to write.
-// We do nothing in the write thread, we run doOutput only on delivery
-// of doOutput requests.
-bool OutputInterceptor::doOutput() { return false; }
+// We only process IO callbacks in the write thread during catch-up.
+// Normally we run doOutput only on delivery of doOutput requests.
+bool OutputInterceptor::doOutput() {
+    parent.doCatchupIoCallbacks();
+    return false;
+}
 
-// Send output up to limit, calculate new limit. 
+// Send output up to limit, calculate new limit.
 void OutputInterceptor::deliverDoOutput(uint32_t limit) {
     sentDoOutput = false;
     sendMax = limit;
@@ -78,7 +81,7 @@ void OutputInterceptor::deliverDoOutput(
     if (parent.isLocal()) {
         size_t buffered = getBuffered();
         if (buffered == 0 && sent == sendMax) // Could have sent more, increase the limit.
-            newLimit = sendMax*2; 
+            newLimit = sendMax*2;
         else if (buffered > 0 && sent > 1) // Data left unsent, reduce the limit.
             newLimit = (sendMax + sent) / 2;
     }

Modified: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp Tue Mar 15 01:54:07 2011
@@ -36,13 +36,8 @@ const std::string UpdateDataExchange::MA
 const std::string UpdateDataExchange::MANAGEMENT_SCHEMAS_KEY("management-schemas");
 const std::string UpdateDataExchange::MANAGEMENT_DELETED_OBJECTS_KEY("management-deleted-objects");
 
-std::ostream& operator<<(std::ostream& o, const UpdateDataExchange& c) {
-    return o << "cluster(" << c.clusterId << " UPDATER)";
-}
-
 UpdateDataExchange::UpdateDataExchange(Cluster& cluster) :
-    Exchange(EXCHANGE_NAME, &cluster),
-    clusterId(cluster.getId())
+    Exchange(EXCHANGE_NAME, &cluster)
 {}
 
 void UpdateDataExchange::route(broker::Deliverable& msg, const std::string& routingKey,
@@ -62,11 +57,9 @@ void UpdateDataExchange::updateManagemen
 
     framing::Buffer buf1(const_cast<char*>(managementAgents.data()), managementAgents.size());
     agent->importAgents(buf1);
-    QPID_LOG(debug, *this << " updated management agents.");
 
     framing::Buffer buf2(const_cast<char*>(managementSchemas.data()), managementSchemas.size());
     agent->importSchemas(buf2);
-    QPID_LOG(debug, *this << " updated management schemas.");
 
     using amqp_0_10::ListCodec;
     using types::Variant;
@@ -78,7 +71,6 @@ void UpdateDataExchange::updateManagemen
                               new management::ManagementAgent::DeletedObject(*i)));
     }
     agent->importDeletedObjects(objects);
-    QPID_LOG(debug, *this << " updated management deleted objects.");
 }
 
 

Modified: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/UpdateDataExchange.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/UpdateDataExchange.h?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/UpdateDataExchange.h (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/UpdateDataExchange.h Tue Mar 15 01:54:07 2011
@@ -74,11 +74,9 @@ class UpdateDataExchange : public broker
     void updateManagementAgent(management::ManagementAgent* agent);
 
   private:
-    MemberId clusterId;
     std::string managementAgents;
     std::string managementSchemas;
     std::string managementDeletedObjects;
-  friend std::ostream& operator<<(std::ostream&, const UpdateDataExchange&);
 };
 
 }} // namespace qpid::cluster

Modified: qpid/branches/qpid-2920/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/management/ManagementAgent.cpp Tue Mar 15 01:54:07 2011
@@ -75,6 +75,18 @@ namespace {
         }
         return n2;
     }
+
+struct ScopedManagementContext
+{
+    ScopedManagementContext(const qpid::broker::ConnectionState* context)
+    {
+        setManagementExecutionContext(context);
+    }
+    ~ScopedManagementContext()
+    {
+        setManagementExecutionContext(0);
+    }
+};
 }
 
 
@@ -536,6 +548,7 @@ void ManagementAgent::sendBufferLH(Buffe
     dp->setRoutingKey(routingKey);
 
     msg->getFrames().append(content);
+    msg->setIsManagementMessage(true);
 
     {
         sys::Mutex::ScopedUnlock u(userLock);
@@ -612,6 +625,7 @@ void ManagementAgent::sendBufferLH(const
         msg->setTimestamp(broker->getExpiryPolicy());
     }
     msg->getFrames().append(content);
+    msg->setIsManagementMessage(true);
 
     {
         sys::Mutex::ScopedUnlock u(userLock);
@@ -2238,7 +2252,7 @@ void ManagementAgent::dispatchAgentComma
     uint32_t bufferLen = inBuffer.getPosition();
     inBuffer.reset();
 
-    setManagementExecutionContext((const qpid::broker::ConnectionState*) msg.getPublisher());
+    ScopedManagementContext context((const qpid::broker::ConnectionState*) msg.getPublisher());
     const framing::FieldTable *headers = msg.getApplicationHeaders();
     if (headers && msg.getAppId() == "qmf2")
     {

Modified: qpid/branches/qpid-2920/qpid/cpp/src/qpid/sys/ClusterSafe.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/sys/ClusterSafe.cpp?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/sys/ClusterSafe.cpp (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/sys/ClusterSafe.cpp Tue Mar 15 01:54:07 2011
@@ -51,6 +51,16 @@ ClusterSafeScope::~ClusterSafeScope() {
     inContext = save;
 }
 
+ClusterUnsafeScope::ClusterUnsafeScope()  {
+    save = inContext;
+    inContext = false;
+}
+
+ClusterUnsafeScope::~ClusterUnsafeScope() {
+    assert(!inContext);
+    inContext = save;
+}
+
 void enableClusterSafe() { inCluster = true; }
 
 }} // namespace qpid::sys

Modified: qpid/branches/qpid-2920/qpid/cpp/src/qpid/sys/ClusterSafe.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/sys/ClusterSafe.h?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/sys/ClusterSafe.h (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/sys/ClusterSafe.h Tue Mar 15 01:54:07 2011
@@ -53,10 +53,8 @@ QPID_COMMON_EXTERN void assertClusterSaf
 QPID_COMMON_EXTERN bool isClusterSafe();
 
 /**
- * Base class for classes that encapsulate state which is replicated
- * to all members of a cluster. Acts as a marker for clustered state
- * and provides functions to assist detecting bugs in cluster
- * behavior.
+ *  Mark a scope as cluster safe. Sets isClusterSafe in constructor and resets
+ *  to previous value in destructor.
  */
 class ClusterSafeScope {
   public:
@@ -67,6 +65,18 @@ class ClusterSafeScope {
 };
 
 /**
+ *  Mark a scope as cluster unsafe. Clears isClusterSafe in constructor and resets
+ *  to previous value in destructor.
+ */
+class ClusterUnsafeScope {
+  public:
+    QPID_COMMON_EXTERN ClusterUnsafeScope();
+    QPID_COMMON_EXTERN ~ClusterUnsafeScope();
+  private:
+    bool save;
+};
+
+/**
  * Enable cluster-safe assertions. By default they are no-ops.
  * Called by cluster code.
  */

Modified: qpid/branches/qpid-2920/qpid/cpp/src/qpid/sys/SslPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/sys/SslPlugin.cpp?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/sys/SslPlugin.cpp (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/sys/SslPlugin.cpp Tue Mar 15 01:54:07 2011
@@ -95,7 +95,7 @@ static struct SslPlugin : public Plugin 
         // Only provide to a Broker
         if (broker) {
             if (options.certDbPath.empty()) {
-                QPID_LOG(info, "SSL plugin not enabled, you must set --ssl-cert-db to enable it.");                    
+                QPID_LOG(notice, "SSL plugin not enabled, you must set --ssl-cert-db to enable it.");
             } else {
                 try {
                     ssl::initNSS(options, true);

Modified: qpid/branches/qpid-2920/qpid/cpp/src/qpid/types/Variant.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/types/Variant.cpp?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/types/Variant.cpp (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/types/Variant.cpp Tue Mar 15 01:54:07 2011
@@ -108,7 +108,6 @@ class VariantImpl
     } value;
     std::string encoding;//optional encoding for variable length data
 
-    std::string getTypeName(VariantType type) const;
     template<class T> T convertFromString() const
     {
         std::string* s = reinterpret_cast<std::string*>(value.v);
@@ -582,7 +581,7 @@ const std::string& VariantImpl::getStrin
 void VariantImpl::setEncoding(const std::string& s) { encoding = s; }
 const std::string& VariantImpl::getEncoding() const { return encoding; }
 
-std::string VariantImpl::getTypeName(VariantType type) const
+std::string getTypeName(VariantType type)
 {
     switch (type) {
       case VAR_VOID: return "void";

Modified: qpid/branches/qpid-2920/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/tests/Makefile.am?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/tests/Makefile.am (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/tests/Makefile.am Tue Mar 15 01:54:07 2011
@@ -287,18 +287,6 @@ check_PROGRAMS+=datagen
 datagen_SOURCES=datagen.cpp
 datagen_LDADD=$(lib_common) $(lib_client)
 
-check_PROGRAMS+=qrsh_server
-qrsh_server_SOURCES=qrsh_server.cpp
-qrsh_server_LDADD=$(lib_client)
-
-check_PROGRAMS+=qrsh_run
-qrsh_run_SOURCES=qrsh_run.cpp
-qrsh_run_LDADD=$(lib_client)
-
-check_PROGRAMS+=qrsh
-qrsh_SOURCES=qrsh.cpp
-qrsh_LDADD=$(lib_client)
-
 check_PROGRAMS+=qpid-stream
 qpid_stream_INCLUDES=$(PUBLIC_INCLUDES)
 qpid_stream_SOURCES=qpid-stream.cpp

Modified: qpid/branches/qpid-2920/qpid/cpp/src/tests/MessageUtils.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/tests/MessageUtils.h?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/tests/MessageUtils.h (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/tests/MessageUtils.h Tue Mar 15 01:54:07 2011
@@ -20,7 +20,6 @@
  */
 
 #include "qpid/broker/Message.h"
-#include "qpid/broker/AsyncCompletion.h"
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/framing/MessageTransferBody.h"
 #include "qpid/framing/Uuid.h"
@@ -29,17 +28,6 @@ using namespace qpid;
 using namespace broker;
 using namespace framing;
 
-namespace {
-    class DummyCompletion : public AsyncCompletion
-    {
-  public:
-        DummyCompletion() {}
-        virtual ~DummyCompletion() {}
-  protected:
-        void completed(bool) {}
-    };
-}
-
 namespace qpid {
 namespace tests {
 
@@ -62,8 +50,6 @@ struct MessageUtils
         msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey);
         if (durable)
             msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setDeliveryMode(2);
-        boost::shared_ptr<AsyncCompletion>dc(new DummyCompletion());
-        msg->setIngressCompletion(dc);
         return msg;
     }
 

Modified: qpid/branches/qpid-2920/qpid/cpp/src/tests/QueueTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/tests/QueueTest.cpp?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/tests/QueueTest.cpp Tue Mar 15 01:54:07 2011
@@ -88,8 +88,6 @@ intrusive_ptr<Message> create_message(st
     msg->getFrames().append(method);
     msg->getFrames().append(header);
     msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey);
-    boost::shared_ptr<AsyncCompletion>dc(new DummyCompletion());
-    msg->setIngressCompletion(dc);
     return msg;
 }
 
@@ -637,7 +635,7 @@ QPID_AUTO_TEST_CASE(testLVQRecover){
 
     Queue::shared_ptr queue1(new Queue("my-queue", true, &testStore));
     intrusive_ptr<Message> received;
-    queue1->configure(args);
+    queue1->create(args);
 
     intrusive_ptr<Message> msg1 = create_message("e", "A");
     intrusive_ptr<Message> msg2 = create_message("e", "A");
@@ -709,9 +707,9 @@ QPID_AUTO_TEST_CASE(testMultiQueueLastNo
     args.setPersistLastNode();
 
     Queue::shared_ptr queue1(new Queue("queue1", true, &testStore ));
-    queue1->configure(args);
+    queue1->create(args);
     Queue::shared_ptr queue2(new Queue("queue2", true, &testStore ));
-    queue2->configure(args);
+    queue2->create(args);
 
     intrusive_ptr<Message> msg1 = create_message("e", "A");
 
@@ -797,7 +795,7 @@ not requeued to the store.
 
     Queue::shared_ptr queue1(new Queue("my-queue", true, &testStore));
     intrusive_ptr<Message> received;
-    queue1->configure(args);
+    queue1->create(args);
 
     // check requeue 1
     intrusive_ptr<Message> msg1 = create_message("e", "C");



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message