qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject svn commit: r1070913 - in /qpid/trunk/qpid/cpp/src: ./ qpid/broker/ qpid/cluster/
Date Tue, 15 Feb 2011 14:17:45 GMT
Author: gsim
Date: Tue Feb 15 14:17:45 2011
New Revision: 1070913

URL: http://svn.apache.org/viewvc?rev=1070913&view=rev
Log:
QPID-3002: Configurable threshold alerts for queues

Added:
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueObserver.h
    qpid/trunk/qpid/cpp/src/qpid/broker/ThresholdAlerts.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/ThresholdAlerts.h
Modified:
    qpid/trunk/qpid/cpp/src/CMakeLists.txt
    qpid/trunk/qpid/cpp/src/Makefile.am
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.h
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp

Modified: qpid/trunk/qpid/cpp/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/CMakeLists.txt?rev=1070913&r1=1070912&r2=1070913&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/CMakeLists.txt (original)
+++ qpid/trunk/qpid/cpp/src/CMakeLists.txt Tue Feb 15 14:17:45 2011
@@ -1011,6 +1011,7 @@ set (qpidbroker_SOURCES
      qpid/broker/SessionHandler.h
      qpid/broker/SessionHandler.cpp
      qpid/broker/System.cpp
+     qpid/broker/ThresholdAlerts.cpp
      qpid/broker/TopicExchange.cpp
      qpid/broker/TxAccept.cpp
      qpid/broker/TxBuffer.cpp

Modified: qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=1070913&r1=1070912&r2=1070913&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/Makefile.am Tue Feb 15 14:17:45 2011
@@ -603,6 +603,7 @@ libqpidbroker_la_SOURCES = \
   qpid/broker/QueueEvents.h \
   qpid/broker/QueueListeners.cpp \
   qpid/broker/QueueListeners.h \
+  qpid/broker/QueueObserver.h \
   qpid/broker/QueuePolicy.cpp \
   qpid/broker/QueuePolicy.h \
   qpid/broker/QueueRegistry.cpp \
@@ -649,6 +650,8 @@ libqpidbroker_la_SOURCES = \
   qpid/broker/SignalHandler.h \
   qpid/broker/System.cpp \
   qpid/broker/System.h \
+  qpid/broker/ThresholdAlerts.cpp \
+  qpid/broker/ThresholdAlerts.h \
   qpid/broker/TopicExchange.cpp \
   qpid/broker/TopicExchange.h \
   qpid/broker/TransactionalStore.h \

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1070913&r1=1070912&r2=1070913&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Tue Feb 15 14:17:45 2011
@@ -221,7 +221,6 @@ Broker::Broker(const Broker::Options& co
     }
 
     QueuePolicy::setDefaultMaxSize(conf.queueLimit);
-    queues.setQueueEvents(&queueEvents);
 
     // Early-Initialize plugins
     Plugin::earlyInitAll(*this);

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1070913&r1=1070912&r2=1070913&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Tue Feb 15 14:17:45 2011
@@ -31,6 +31,7 @@
 #include "qpid/broker/MessageStore.h"
 #include "qpid/broker/NullMessageStore.h"
 #include "qpid/broker/QueueRegistry.h"
+#include "qpid/broker/ThresholdAlerts.h"
 
 #include "qpid/StringUtils.h"
 #include "qpid/log/Statement.h"
@@ -104,7 +105,6 @@ Queue::Queue(const string& _name, bool _
     policyExceeded(false),
     mgmtObject(0),
     eventMode(0),
-    eventMgr(0),
     insertSeqNo(0),
     broker(b),
     deleted(false),
@@ -168,7 +168,6 @@ void Queue::deliver(boost::intrusive_ptr
         }else {
             push(msg);
         }
-        mgntEnqStats(msg);
         QPID_LOG(debug, "Message " << msg << " enqueued on " << name);
     }
 }
@@ -187,7 +186,6 @@ void Queue::recover(boost::intrusive_ptr
         msg->addToSyncList(shared_from_this(), store); 
     }
     msg->enqueueComplete(); // mark the message as enqueued
-    mgntEnqStats(msg);
 
     if (store && (!msg->isContentLoaded() || msg->checkContentReleasable()))
{
         //content has not been loaded, need to ensure that lazy loading mode is set:
@@ -202,7 +200,6 @@ void Queue::recover(boost::intrusive_ptr
 
 void Queue::process(boost::intrusive_ptr<Message>& msg){
     push(msg);
-    mgntEnqStats(msg);
     if (mgmtObject != 0){
         mgmtObject->inc_msgTxnEnqueues ();
         mgmtObject->inc_byteTxnEnqueues (msg->contentSize ());
@@ -527,14 +524,7 @@ void Queue::push(boost::intrusive_ptr<Me
          
         dequeueRequired = messages->push(qm, removed);
         listeners.populate(copy);
-
-        if (eventMode) {
-            if (eventMgr) eventMgr->enqueued(qm);
-            else QPID_LOG(warning, "Enqueue manager not set, events not generated for " <<
getName());
-        }
-        if (policy.get()) {
-            policy->enqueued(qm);
-        }
+        enqueued(qm);
     }
     copy.notify();
     if (dequeueRequired) {
@@ -717,8 +707,12 @@ void Queue::dequeued(const QueuedMessage
 {
     if (policy.get()) policy->dequeued(msg);
     mgntDeqStats(msg.payload);
-    if (eventMode == ENQUEUE_AND_DEQUEUE && eventMgr) {
-        eventMgr->dequeued(msg);
+    for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
+        try{
+            (*i)->dequeued(msg);
+        } catch (const std::exception& e) {
+            QPID_LOG(warning, "Exception on notification of dequeue for queue " <<
getName() << ": " << e.what());
+        }
     }
 }
 
@@ -736,12 +730,15 @@ void Queue::configure(const FieldTable& 
 {
 
     eventMode = _settings.getAsInt(qpidQueueEventGeneration);
+    if (eventMode && broker) {
+        broker->getQueueEvents().observe(*this, eventMode == ENQUEUE_ONLY);
+    }
 
     if (QueuePolicy::getType(_settings) == QueuePolicy::FLOW_TO_DISK && 
-        (!store || NullMessageStore::isNullStore(store) || (eventMode && eventMgr
&& !eventMgr->isSync()) )) {
+        (!store || NullMessageStore::isNullStore(store) || (broker && !(broker->getQueueEvents().isSync()))
)) {
         if ( NullMessageStore::isNullStore(store)) {
             QPID_LOG(warning, "Flow to disk not valid for non-persisted queue:" <<
getName());
-        } else if (eventMgr && !eventMgr->isSync() ) {
+        } else if (broker && !(broker->getQueueEvents().isSync()) ) {
             QPID_LOG(warning, "Flow to disk not valid with async Queue Events:" <<
getName());
         }
         FieldTable copy(_settings);
@@ -750,6 +747,10 @@ void Queue::configure(const FieldTable& 
     } else {
         setPolicy(QueuePolicy::createQueuePolicy(getName(), _settings));
     }
+    if (broker && broker->getManagementAgent()) {
+        ThresholdAlerts::observe(*this, *(broker->getManagementAgent()), _settings);
+    }
+
     //set this regardless of owner to allow use of no-local with exclusive consumers also
     noLocal = _settings.get(qpidNoLocal);
     QPID_LOG(debug, "Configured queue " << getName() << " with no-local=" <<
noLocal);
@@ -1027,11 +1028,6 @@ SequenceNumber Queue::getPosition() {
 
 int Queue::getEventMode() { return eventMode; }
 
-void Queue::setQueueEventManager(QueueEvents& mgr)
-{
-    eventMgr = &mgr;
-}
-
 void Queue::recoveryComplete(ExchangeRegistry& exchanges)
 {
     // set the alternate exchange
@@ -1057,14 +1053,28 @@ void Queue::insertSequenceNumbers(const 
 
 void Queue::enqueued(const QueuedMessage& m)
 {
-    if (m.payload) {
-        if (policy.get()) {
-            policy->recoverEnqueued(m.payload);
-            policy->enqueued(m);
+    for (Observers::iterator i = observers.begin(); i != observers.end(); ++i) {
+        try {
+            (*i)->enqueued(m);
+        } catch (const std::exception& e) {
+            QPID_LOG(warning, "Exception on notification of enqueue for queue " <<
getName() << ": " << e.what());
         }
-        mgntEnqStats(m.payload);
+    }
+    if (policy.get()) {
+        policy->enqueued(m);
+    }
+    mgntEnqStats(m.payload);
+}
+
+void Queue::updateEnqueued(const QueuedMessage& m)
+{
+    if (m.payload) {
         boost::intrusive_ptr<Message> payload = m.payload;
         enqueue ( 0, payload, true );
+        if (policy.get()) {
+            policy->recoverEnqueued(payload);
+        }
+        enqueued(m);
     } else {
         QPID_LOG(warning, "Queue informed of enqueued message that has no payload");
     }
@@ -1086,6 +1096,11 @@ void Queue::checkNotDeleted()
     }
 }
 
+void Queue::addObserver(boost::shared_ptr<QueueObserver> observer)
+{
+    observers.insert(observer);
+}
+
 void Queue::flush()
 {
     ScopedUse u(barrier);

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=1070913&r1=1070912&r2=1070913&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Tue Feb 15 14:17:45 2011
@@ -31,6 +31,7 @@
 #include "qpid/broker/QueuePolicy.h"
 #include "qpid/broker/QueueBindings.h"
 #include "qpid/broker/QueueListeners.h"
+#include "qpid/broker/QueueObserver.h"
 #include "qpid/broker/RateTracker.h"
 
 #include "qpid/framing/FieldTable.h"
@@ -47,6 +48,7 @@
 #include <vector>
 #include <memory>
 #include <deque>
+#include <set>
 #include <algorithm>
 
 namespace qpid {
@@ -86,6 +88,7 @@ class Queue : public boost::enable_share
         ~ScopedUse() { if (acquired) barrier.release(); }
     };
             
+    typedef std::set< boost::shared_ptr<QueueObserver> > Observers;
     enum ConsumeCode {NO_MESSAGES=0, CANT_CONSUME=1, CONSUMED=2};
 
 
@@ -117,7 +120,7 @@ class Queue : public boost::enable_share
     qmf::org::apache::qpid::broker::Queue* mgmtObject;
     RateTracker dequeueTracker;
     int eventMode;
-    QueueEvents* eventMgr;
+    Observers observers;
     bool insertSeqNo;
     std::string seqNoKey;
     Broker* broker;
@@ -136,11 +139,13 @@ class Queue : public boost::enable_share
 
     bool isExcluded(boost::intrusive_ptr<Message>& msg);
 
+    void enqueued(const QueuedMessage& msg);
     void dequeued(const QueuedMessage& msg);
     void pop();
     void popAndDequeue();
     QueuedMessage getFront();
     void forcePersistent(QueuedMessage& msg);
+    int getEventMode();
 
     inline void mgntEnqStats(const boost::intrusive_ptr<Message>& msg)
     {
@@ -270,7 +275,7 @@ class Queue : public boost::enable_share
      * thus are still logically on the queue) - used in
      * clustered broker.  
      */ 
-    void enqueued(const QueuedMessage& msg);
+    void updateEnqueued(const QueuedMessage& msg);
 
     /**
      * Test whether the specified message (identified by its
@@ -331,8 +336,7 @@ class Queue : public boost::enable_share
     /** return current position sequence number for the next message on the queue.
      */
     QPID_BROKER_EXTERN framing::SequenceNumber getPosition();
-    int getEventMode();
-    void setQueueEventManager(QueueEvents&);
+    void addObserver(boost::shared_ptr<QueueObserver>);
     QPID_BROKER_EXTERN void insertSequenceNumbers(const std::string& key);
     /**
      * Notify queue that recovery has completed.

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.cpp?rev=1070913&r1=1070912&r2=1070913&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.cpp Tue Feb 15 14:17:45 2011
@@ -19,6 +19,8 @@
  *
  */
 #include "qpid/broker/QueueEvents.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/QueueObserver.h"
 #include "qpid/Exception.h"
 #include "qpid/log/Statement.h"
 
@@ -115,6 +117,29 @@ bool QueueEvents::isSync()
     return sync;
 }
 
+class EventGenerator : public QueueObserver
+{
+  public:
+    EventGenerator(QueueEvents& mgr, bool enqOnly) : manager(mgr), enqueueOnly(enqOnly)
{}
+    void enqueued(const QueuedMessage& m)
+    {
+        manager.enqueued(m);
+    }
+    void dequeued(const QueuedMessage& m)
+    {
+        if (!enqueueOnly) manager.dequeued(m);
+    }
+  private:
+    QueueEvents& manager;
+    const bool enqueueOnly;
+};
+
+void QueueEvents::observe(Queue& queue, bool enqueueOnly)
+{
+    boost::shared_ptr<QueueObserver> observer(new EventGenerator(*this, enqueueOnly));
+    queue.addObserver(observer);
+}
+
 
 QueueEvents::Event::Event(EventType t, const QueuedMessage& m) : type(t), msg(m) {}
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.h?rev=1070913&r1=1070912&r2=1070913&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.h Tue Feb 15 14:17:45 2011
@@ -63,6 +63,7 @@ class QueueEvents
     QPID_BROKER_EXTERN void unregisterListener(const std::string& id);
     void enable();
     void disable();
+    void observe(Queue&, bool enqueueOnly);
     //process all outstanding events
     QPID_BROKER_EXTERN void shutdown();
     QPID_BROKER_EXTERN bool isSync();

Added: qpid/trunk/qpid/cpp/src/qpid/broker/QueueObserver.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueObserver.h?rev=1070913&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueObserver.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueObserver.h Tue Feb 15 14:17:45 2011
@@ -0,0 +1,82 @@
+#ifndef QPID_BROKER_QUEUEOBSERVER_H
+#define QPID_BROKER_QUEUEOBSERVER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+namespace qpid {
+namespace broker {
+
+class QueuedMessage;
+/**
+ * Interface for notifying classes who want to act as 'observers' of a
+ * queue of particular events.
+ */
+class QueueObserver
+{
+  public:
+    virtual void enqueued(const QueuedMessage&) = 0;
+    virtual void dequeued(const QueuedMessage&) = 0;
+  private:
+};
+}} // namespace qpid::broker
+
+#endif  /*!QPID_BROKER_QUEUEOBSERVER_H*/
+#ifndef QPID_BROKER_QUEUEOBSERVER_H
+#define QPID_BROKER_QUEUEOBSERVER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+namespace qpid {
+namespace broker {
+
+class QueuedMessage;
+/**
+ * Interface for notifying classes who want to act as 'observers' of a
+ * queue of particular events.
+ */
+class QueueObserver
+{
+  public:
+    virtual void enqueued(const QueuedMessage&) = 0;
+    virtual void dequeued(const QueuedMessage&) = 0;
+  private:
+};
+}} // namespace qpid::broker
+
+#endif  /*!QPID_BROKER_QUEUEOBSERVER_H*/

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp?rev=1070913&r1=1070912&r2=1070913&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp Tue Feb 15 14:17:45 2011
@@ -47,7 +47,6 @@ QueueRegistry::declare(const string& dec
         Queue::shared_ptr queue(new Queue(name, autoDelete, durable ? store : 0, owner, parent,
broker));
         queues[name] = queue;
         if (lastNode) queue->setLastNodeFailure();
-        if (events) queue->setQueueEventManager(*events);
 
         return std::pair<Queue::shared_ptr, bool>(queue, true);
     } else {
@@ -108,8 +107,3 @@ void QueueRegistry::updateQueueClusterSt
     }
     lastNode = _lastNode;
 }
-
-void QueueRegistry::setQueueEvents(QueueEvents* e)
-{
-    events = e;
-}

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h?rev=1070913&r1=1070912&r2=1070913&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h Tue Feb 15 14:17:45 2011
@@ -96,8 +96,6 @@ class QueueRegistry {
      */
     std::string generateName();
 
-    void setQueueEvents(QueueEvents*);
-
     /**
      * Set the store to use.  May only be called once.
      */

Added: qpid/trunk/qpid/cpp/src/qpid/broker/ThresholdAlerts.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/ThresholdAlerts.cpp?rev=1070913&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/ThresholdAlerts.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/ThresholdAlerts.cpp Tue Feb 15 14:17:45 2011
@@ -0,0 +1,139 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/ThresholdAlerts.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/QueuedMessage.h"
+#include "qpid/amqp_0_10/Codecs.h"
+#include "qpid/log/Statement.h"
+#include "qpid/management/ManagementAgent.h"
+#include "qmf/org/apache/qpid/broker/EventQueueThresholdExceeded.h"
+
+namespace qpid {
+namespace broker {
+ThresholdAlerts::ThresholdAlerts(const std::string& n,
+                                 qpid::management::ManagementAgent& a,
+                                 const uint32_t ct,
+                                 const uint64_t st,
+                                 const long repeat)
+    : name(n), agent(a), countThreshold(ct), sizeThreshold(st),
+      repeatInterval(repeat ? repeat*qpid::sys::TIME_SEC : 0),
+      count(0), size(0), lastAlert(qpid::sys::EPOCH) {}
+
+void ThresholdAlerts::enqueued(const QueuedMessage& m)
+{
+    size += m.payload->contentSize();
+    ++count;
+    if ((countThreshold && count >= countThreshold) || (sizeThreshold &&
size >= sizeThreshold)) {
+        if ((repeatInterval == 0 && lastAlert == qpid::sys::EPOCH)
+            || qpid::sys::Duration(lastAlert, qpid::sys::now()) > repeatInterval) {
+            agent.raiseEvent(qmf::org::apache::qpid::broker::EventQueueThresholdExceeded(name,
count, size));
+            lastAlert = qpid::sys::now();
+        }
+    }
+}
+
+void ThresholdAlerts::dequeued(const QueuedMessage& m)
+{
+    size -= m.payload->contentSize();
+    --count;
+    if ((countThreshold && count < countThreshold) || (sizeThreshold &&
size < sizeThreshold)) {
+        lastAlert = qpid::sys::EPOCH;
+    }
+}
+
+
+
+void ThresholdAlerts::observe(Queue& queue, qpid::management::ManagementAgent& agent,
+                              const uint64_t countThreshold,
+                              const uint64_t sizeThreshold,
+                              const long repeatInterval)
+{
+    if (countThreshold || sizeThreshold) {
+        boost::shared_ptr<QueueObserver> observer(
+            new ThresholdAlerts(queue.getName(), agent, countThreshold, sizeThreshold, repeatInterval)
+        );
+        queue.addObserver(observer);
+    }
+}
+
+void ThresholdAlerts::observe(Queue& queue, qpid::management::ManagementAgent& agent,
+                              const qpid::framing::FieldTable& settings)
+
+{
+    qpid::types::Variant::Map map;
+    qpid::amqp_0_10::translate(settings, map);
+    observe(queue, agent, map);
+}
+
+template <class T>
+class Option
+{
+  public:
+    Option(const std::string& name, T d) : defaultValue(d) { names.push_back(name); }
+    void addAlias(const std::string& name) { names.push_back(name); }
+    T get(const qpid::types::Variant::Map& settings) const
+    {
+        T value(defaultValue);
+        for (std::vector<std::string>::const_iterator i = names.begin(); i != names.end();
++i) {
+            if (get(settings, *i, value)) break;
+        }
+        return value;
+    }
+  private:
+    std::vector<std::string> names;
+    T defaultValue;
+
+    bool get(const qpid::types::Variant::Map& settings, const std::string& name,
T& value) const
+    {
+        qpid::types::Variant::Map::const_iterator i = settings.find(name);
+        if (i != settings.end()) {
+            try {
+                value = (T) i->second;
+            } catch (const qpid::types::InvalidConversion&) {
+                QPID_LOG(warning, "Bad value for" << name << ": " << i->second);
+            }
+            return true;
+        } else {
+            return false;
+        }
+    }
+};
+
+void ThresholdAlerts::observe(Queue& queue, qpid::management::ManagementAgent& agent,
+                              const qpid::types::Variant::Map& settings)
+
+{
+    //Note: aliases are keys defined by java broker
+    Option<int64_t> repeatInterval("qpid.alert_repeat_gap", 60);
+    repeatInterval.addAlias("x-qpid-minimum-alert-repeat-gap");
+
+    //If no explicit threshold settings were given use 80% of any
+    //limit from the policy.
+    const QueuePolicy* policy = queue.getPolicy();
+    Option<uint32_t> countThreshold("qpid.alert_count", policy ? policy->getMaxCount()*0.8
: 0);
+    countThreshold.addAlias("x-qpid-maximum-message-count");
+    Option<uint64_t> sizeThreshold("qpid.alert_size", policy ? policy->getMaxSize()*0.8
: 0);
+    sizeThreshold.addAlias("x-qpid-maximum-message-size");
+
+    observe(queue, agent, countThreshold.get(settings), sizeThreshold.get(settings), repeatInterval.get(settings));
+}
+
+}}

Added: qpid/trunk/qpid/cpp/src/qpid/broker/ThresholdAlerts.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/ThresholdAlerts.h?rev=1070913&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/ThresholdAlerts.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/ThresholdAlerts.h Tue Feb 15 14:17:45 2011
@@ -0,0 +1,146 @@
+#ifndef QPID_BROKER_THRESHOLDALERTS_H
+#define QPID_BROKER_THRESHOLDALERTS_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/QueueObserver.h"
+#include "qpid/sys/Time.h"
+#include "qpid/types/Variant.h"
+#include <string>
+
+namespace qpid {
+namespace framing {
+class FieldTable;
+}
+namespace management {
+class ManagementAgent;
+}
+namespace broker {
+
+class Queue;
+/**
+ * Class to manage generation of QMF alerts when particular thresholds
+ * are breached on a queue.
+ */
+class ThresholdAlerts : public QueueObserver
+{
+  public:
+    ThresholdAlerts(const std::string& name,
+                    qpid::management::ManagementAgent& agent,
+                    const uint32_t countThreshold,
+                    const uint64_t sizeThreshold,
+                    const long repeatInterval);
+    void enqueued(const QueuedMessage&);
+    void dequeued(const QueuedMessage&);
+    static void observe(Queue& queue, qpid::management::ManagementAgent& agent,
+                        const uint64_t countThreshold,
+                        const uint64_t sizeThreshold,
+                        const long repeatInterval);
+    static void observe(Queue& queue, qpid::management::ManagementAgent& agent,
+                        const qpid::framing::FieldTable& settings);
+    static void observe(Queue& queue, qpid::management::ManagementAgent& agent,
+                        const qpid::types::Variant::Map& settings);
+  private:
+    const std::string name;
+    qpid::management::ManagementAgent& agent;
+    const uint32_t countThreshold;
+    const uint64_t sizeThreshold;
+    const qpid::sys::Duration repeatInterval;
+    uint64_t count;
+    uint64_t size;
+    qpid::sys::AbsTime lastAlert;
+};
+}} // namespace qpid::broker
+
+#endif  /*!QPID_BROKER_THRESHOLDALERTS_H*/
+#ifndef QPID_BROKER_THRESHOLDALERTS_H
+#define QPID_BROKER_THRESHOLDALERTS_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/QueueObserver.h"
+#include "qpid/sys/Time.h"
+#include "qpid/types/Variant.h"
+#include <string>
+
+namespace qpid {
+namespace framing {
+class FieldTable;
+}
+namespace management {
+class ManagementAgent;
+}
+namespace broker {
+
+class Queue;
+/**
+ * Class to manage generation of QMF alerts when particular thresholds
+ * are breached on a queue.
+ */
+class ThresholdAlerts : public QueueObserver
+{
+  public:
+    ThresholdAlerts(const std::string& name,
+                    qpid::management::ManagementAgent& agent,
+                    const uint32_t countThreshold,
+                    const uint64_t sizeThreshold,
+                    const long repeatInterval);
+    void enqueued(const QueuedMessage&);
+    void dequeued(const QueuedMessage&);
+    static void observe(Queue& queue, qpid::management::ManagementAgent& agent,
+                        const uint64_t countThreshold,
+                        const uint64_t sizeThreshold,
+                        const long repeatInterval);
+    static void observe(Queue& queue, qpid::management::ManagementAgent& agent,
+                        const qpid::framing::FieldTable& settings);
+    static void observe(Queue& queue, qpid::management::ManagementAgent& agent,
+                        const qpid::types::Variant::Map& settings);
+  private:
+    const std::string name;
+    qpid::management::ManagementAgent& agent;
+    const uint32_t countThreshold;
+    const uint64_t sizeThreshold;
+    const qpid::sys::Duration repeatInterval;
+    uint64_t count;
+    uint64_t size;
+    qpid::sys::AbsTime lastAlert;
+};
+}} // namespace qpid::broker
+
+#endif  /*!QPID_BROKER_THRESHOLDALERTS_H*/

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=1070913&r1=1070912&r2=1070913&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Tue Feb 15 14:17:45 2011
@@ -529,7 +529,7 @@ void Connection::deliveryRecord(const st
             m = getUpdateMessage();
             m.queue = queue.get();
             m.position = position;
-            if (enqueued) queue->enqueued(m); //inform queue of the message 
+            if (enqueued) queue->updateEnqueued(m); //inform queue of the message 
         } else {                // Message at original position in original queue
             m = queue->find(position);
         }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message