qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cctriel...@apache.org
Subject svn commit: r590523 - in /incubator/qpid/trunk/qpid/cpp/src/qpid/broker: ManagementAgent.cpp ManagementAgent.h ManagementExchange.cpp ManagementObject.h Queue.cpp Queue.h QueueRegistry.cpp QueueRegistry.h
Date Wed, 31 Oct 2007 00:43:12 GMT
Author: cctrieloff
Date: Tue Oct 30 17:43:11 2007
New Revision: 590523

URL: http://svn.apache.org/viewvc?rev=590523&view=rev
Log:
- QPID-667
- for Ted Ross 
- built and tested.


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementAgent.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementAgent.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementExchange.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementObject.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementAgent.cpp?rev=590523&r1=590522&r2=590523&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementAgent.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementAgent.cpp Tue Oct 30 17:43:11
2007
@@ -25,6 +25,7 @@
 #include <qpid/broker/Message.h>
 #include <qpid/broker/MessageDelivery.h>
 #include <qpid/framing/AMQFrame.h>
+#include <list>
 
 using namespace qpid::framing;
 using namespace qpid::broker;
@@ -57,7 +58,7 @@
 
 void ManagementAgent::clientAdded (void)
 {
-    for (ManagementObjectList::iterator iter = managementObjects.begin ();
+    for (ManagementObjectVector::iterator iter = managementObjects.begin ();
          iter != managementObjects.end ();
          iter++)
     {
@@ -74,6 +75,7 @@
     char      msgChars[BUFSIZE];
     Buffer    msgBuffer (msgChars, BUFSIZE);
     uint32_t  contentSize;
+    std::list<uint32_t> deleteList;
 
     if (managementObjects.empty ())
         return;
@@ -86,11 +88,9 @@
     msgBuffer.putOctet ('0');
     msgBuffer.putOctet ('1');
 
-    for (ManagementObjectList::iterator iter = managementObjects.begin ();
-         iter != managementObjects.end ();
-         iter++)
+    for (uint32_t idx = 0; idx < managementObjects.size (); idx++)
     {
-        ManagementObject::shared_ptr object = *iter;
+        ManagementObject::shared_ptr object = managementObjects[idx];
 
         if (object->getSchemaNeeded ())
         {
@@ -147,10 +147,7 @@
         }
 
         if (object->isDeleted ())
-        {
-            managementObjects.remove (object);
-            QPID_LOG (debug, "Management Object Removed");
-        }
+            deleteList.push_back (idx);
 
         // Temporary protection against buffer overrun.
         // This needs to be replaced with frame fragmentation.
@@ -189,5 +186,20 @@
 
     DeliverableMessage deliverable (msg);
     exchange->route (deliverable, "mgmt", 0);
+
+    // Delete flagged objects
+    for (std::list<uint32_t>::reverse_iterator iter = deleteList.rbegin ();
+         iter != deleteList.rend ();
+         iter++)
+    {
+        managementObjects.erase (managementObjects.begin () + *iter);
+    }
+    deleteList.clear ();
+}
+
+void ManagementAgent::dispatchCommand (Deliverable&      /*msg*/,
+                                       const string&     /*routingKey*/,
+                                       const FieldTable* /*args*/)
+{
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementAgent.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementAgent.h?rev=590523&r1=590522&r2=590523&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementAgent.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementAgent.h Tue Oct 30 17:43:11 2007
@@ -40,9 +40,12 @@
 
     ManagementAgent (uint16_t interval);
 
-    void setExchange  (Exchange::shared_ptr         exchange);
-    void addObject    (ManagementObject::shared_ptr object);
-    void clientAdded  (void);
+    void setExchange     (Exchange::shared_ptr         exchange);
+    void addObject       (ManagementObject::shared_ptr object);
+    void clientAdded     (void);
+    void dispatchCommand (Deliverable&      msg,
+                          const string&     routingKey,
+                          const FieldTable* args);
     
   private:
 
@@ -55,10 +58,10 @@
         void fire ();
     };
 
-    ManagementObjectList managementObjects;
-    Timer                timer;
-    Exchange::shared_ptr exchange;
-    uint16_t             interval;
+    ManagementObjectVector managementObjects;
+    Timer                  timer;
+    Exchange::shared_ptr   exchange;
+    uint16_t               interval;
 
     void PeriodicProcessing (void);
 };

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementExchange.cpp?rev=590523&r1=590522&r2=590523&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementExchange.cpp Tue Oct 30 17:43:11
2007
@@ -57,8 +57,7 @@
     if (routingKey.length () > 7 &&
         routingKey.substr (0, 7).compare ("method.") == 0)
     {
-        QPID_LOG (debug, "ManagementExchange: Intercept command " << routingKey);
-        // TODO: Send intercepted commands to ManagementAgent for dispatch
+        managementAgent->dispatchCommand (msg, routingKey, args);
         return;
     }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementObject.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementObject.h?rev=590523&r1=590522&r2=590523&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementObject.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementObject.h Tue Oct 30 17:43:11 2007
@@ -25,7 +25,7 @@
 #include "qpid/sys/Time.h"
 #include <qpid/framing/Buffer.h>
 #include <boost/shared_ptr.hpp>
-#include <list>
+#include <vector>
 
 namespace qpid { 
 namespace broker {
@@ -108,7 +108,7 @@
 
 };
 
- typedef std::list<ManagementObject::shared_ptr> ManagementObjectList;
+ typedef std::vector<ManagementObject::shared_ptr> ManagementObjectVector;
 
 }}
             

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=590523&r1=590522&r2=590523&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Tue Oct 30 17:43:11 2007
@@ -71,18 +71,18 @@
     } else {
 
 
-	// if no store then mark as enqueued
+        // if no store then mark as enqueued
         if (!enqueue(0, msg)){
             push(msg);
-	    msg->enqueueComplete();
-	    if (mgmtObjectPtr != 0)
-	        mgmtObjectPtr->enqueue (msg->contentSize ());
-	}else {
-	    if (mgmtObjectPtr != 0)
-	        mgmtObjectPtr->enqueue (msg->contentSize (), MSG_MASK_PERSIST);
+            msg->enqueueComplete();
+            if (mgmtObject != 0)
+                mgmtObject->enqueue (msg->contentSize ());
+        }else {
+            if (mgmtObject != 0)
+                mgmtObject->enqueue (msg->contentSize (), MSG_MASK_PERSIST);
             push(msg);
-	}
-	QPID_LOG(debug, "Message " << msg << " enqueued on " << name <<
"[" << this << "]");
+        }
+        QPID_LOG(debug, "Message " << msg << " enqueued on " << name <<
"[" << this << "]");
         serializer.execute(dispatchCallback);
     }
 }
@@ -91,8 +91,8 @@
 void Queue::recover(Message::shared_ptr& msg){
     push(msg);
     msg->enqueueComplete(); // mark the message as enqueued
-    if (mgmtObjectPtr != 0)
-        mgmtObjectPtr->enqueue (msg->contentSize (), MSG_MASK_PERSIST);
+    if (mgmtObject != 0)
+        mgmtObject->enqueue (msg->contentSize (), MSG_MASK_PERSIST);
     if (store && !msg->isContentLoaded()) {
         //content has not been loaded, need to ensure that lazy loading mode is set:
         //TODO: find a nicer way to do this
@@ -108,20 +108,19 @@
         mask |= MSG_MASK_PERSIST;
 
     push(msg);
-    if (mgmtObjectPtr != 0)
-        mgmtObjectPtr->enqueue (msg->contentSize (), mask);
+    if (mgmtObject != 0)
+        mgmtObject->enqueue (msg->contentSize (), mask);
     serializer.execute(dispatchCallback);
    
 }
 
 void Queue::requeue(const QueuedMessage& msg){
     {
-    	Mutex::ScopedLock locker(messageLock);
-   	msg.payload->enqueueComplete(); // mark the message as enqueued
-    	messages.push_front(msg);
+        Mutex::ScopedLock locker(messageLock);
+        msg.payload->enqueueComplete(); // mark the message as enqueued
+        messages.push_front(msg);
     }
     serializer.execute(dispatchCallback);
-   
 }
 
 bool Queue::acquire(const QueuedMessage& msg) {
@@ -221,7 +220,7 @@
              QPID_LOG(debug, "Message " << msg.payload << " filtered out of "
<< name << "[" << this << "]");        
          } else {            
              break;
-         }	
+         }        
      }
      serviceAllBrowsers();
 }
@@ -290,8 +289,8 @@
         browsers.push_back(c);
     }
 
-    if (mgmtObjectPtr != 0){
-        mgmtObjectPtr->incConsumers ();
+    if (mgmtObject != 0){
+        mgmtObject->incConsumers ();
     }
 }
 
@@ -302,8 +301,8 @@
     } else {
         cancel(c, browsers);
     }
-    if (mgmtObjectPtr != 0){
-        mgmtObjectPtr->decConsumers ();
+    if (mgmtObject != 0){
+        mgmtObject->decConsumers ();
     }
     if(exclusive == c) exclusive.reset();
 }
@@ -318,17 +317,18 @@
 QueuedMessage Queue::dequeue(){
     Mutex::ScopedLock locker(messageLock);
     QueuedMessage msg;
+
     if(!messages.empty()){
         msg = messages.front();
         pop();
-	if (mgmtObjectPtr != 0){
-	    uint32_t mask = 0;
+        if (mgmtObject != 0){
+            uint32_t mask = 0;
 
-	    if (msg.payload->isPersistent ())
-	        mask |= MSG_MASK_PERSIST;
+            if (msg.payload->isPersistent ())
+                mask |= MSG_MASK_PERSIST;
 
-	    mgmtObjectPtr->dequeue (msg.payload->contentSize (), mask);
-	}
+            mgmtObject->dequeue (msg.payload->contentSize (), mask);
+        }
     }
     return msg;
 }
@@ -343,7 +343,7 @@
 void Queue::pop(){
     Mutex::ScopedLock locker(messageLock);
     if (policy.get()) policy->dequeued(messages.front().payload->contentSize());
-    messages.pop_front();    
+    messages.pop_front();
 }
 
 void Queue::push(Message::shared_ptr& msg){
@@ -390,7 +390,7 @@
     if (msg->isPersistent() && store) {
         msg->enqueueAsync(this, store); //increment to async counter -- for message sent
to more than one queue
         store->enqueue(ctxt, *msg.get(), *this);
-	return true;
+        return true;
     }
     return false;
 }
@@ -401,7 +401,7 @@
     if (msg->isPersistent() && store) {
         msg->dequeueAsync(this, store); //increment to async counter -- for message sent
to more than one queue
         store->dequeue(ctxt, *msg.get(), *this);
-	return true;
+        return true;
     }
     return false;
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=590523&r1=590522&r2=590523&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Tue Oct 30 17:43:11 2007
@@ -95,7 +95,7 @@
             qpid::sys::Serializer<DispatchFunctor> serializer;
             DispatchFunctor dispatchCallback;
             framing::SequenceNumber sequence;
-	    ManagementObjectQueue::shared_ptr mgmtObjectPtr;
+            ManagementObjectQueue::shared_ptr mgmtObject;
 
             void pop();
             void push(Message::shared_ptr& msg);
@@ -103,7 +103,7 @@
             void setPolicy(std::auto_ptr<QueuePolicy> policy);
             /**
              * only called by serilizer
-	     */
+             */
             void dispatch();
             void cancel(Consumer::ptr c, Consumers& set);
             void serviceAllBrowsers();
@@ -115,16 +115,16 @@
             bool exclude(Message::shared_ptr msg);
  
         protected:
-	   /**
-	   * Call back from store
-	   */
-  	   virtual void notifyDurableIOComplete();
+           /**
+            * Call back from store
+            */
+            virtual void notifyDurableIOComplete();
 
         public:
             typedef boost::shared_ptr<Queue> shared_ptr;
 
             typedef std::vector<shared_ptr> vector;
-	    
+
             Queue(const string& name, bool autodelete = false, 
                   MessageStore* const store = 0, 
                   const ConnectionToken* const owner = 0);
@@ -135,8 +135,8 @@
             void destroy();
             void bound(const string& exchange, const string& key, const qpid::framing::FieldTable&
args);
             void unbind(ExchangeRegistry& exchanges, Queue::shared_ptr shared_ref);
-            void setMgmt (ManagementObjectQueue::shared_ptr mgmt) { mgmtObjectPtr = mgmt;
}
-            ManagementObjectQueue::shared_ptr getMgmt (void) { return mgmtObjectPtr; }
+            void setMgmt (ManagementObjectQueue::shared_ptr mgmt) { mgmtObject = mgmt; }
+            ManagementObjectQueue::shared_ptr getMgmt (void) { return mgmtObject; }
 
             bool acquire(const QueuedMessage& msg);
 
@@ -165,7 +165,7 @@
              * Request dispatch any queued messages providing there are
              * consumers for them. Only one thread can be dispatching
              * at any time, so this call schedules the despatch based on
-	         * the serilizer policy.
+             * the serilizer policy.
              */
             void requestDispatch(Consumer::ptr c = Consumer::ptr());
             void flush(DispatchCompletion& callback);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp?rev=590523&r1=590522&r2=590523&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp Tue Oct 30 17:43:11 2007
@@ -59,8 +59,7 @@
     }
 }
 
-void QueueRegistry::destroy(const string& name){
-    RWlock::ScopedWlock locker(lock);
+void QueueRegistry::destroyLH (const string& name){
     if (managementAgent){
         ManagementObjectQueue::shared_ptr mgmtObject;
         QueueMap::iterator i = queues.find(name);
@@ -72,6 +71,11 @@
     }
 
     queues.erase(name);
+}
+
+void QueueRegistry::destroy (const string& name){
+    RWlock::ScopedWlock locker(lock);
+    destroyLH (name);
 }
 
 Queue::shared_ptr QueueRegistry::find(const string& name){

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h?rev=590523&r1=590522&r2=590523&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h Tue Oct 30 17:43:11 2007
@@ -62,12 +62,13 @@
      * subsequent calls to find or declare with the same name.
      *
      */
-    void destroy(const string& name);
+    void destroyLH (const string& name);
+    void destroy   (const string& name);
     template <class Test> bool destroyIf(const string& name, Test test)
     {
         qpid::sys::RWlock::ScopedWlock locker(lock);
         if (test()) {
-            queues.erase(name);
+            destroyLH (name);
             return true;
         } else {
             return false;



Mime
View raw message