qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From c...@apache.org
Subject svn commit: r1485836 - in /qpid/trunk/qpid: cpp/src/qpid/broker/Broker.cpp cpp/src/qpid/broker/Broker.h cpp/src/qpid/broker/Queue.cpp cpp/src/qpid/broker/Queue.h cpp/src/tests/CMakeLists.txt specs/management-schema.xml tools/src/py/qpidtoollibs/broker.py
Date Thu, 23 May 2013 19:38:09 GMT
Author: chug
Date: Thu May 23 19:38:09 2013
New Revision: 1485836

URL: http://svn.apache.org/r1485836
Log:
QPID-4650: C++ Broker method to redirect messages between two queues.


Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    qpid/trunk/qpid/cpp/src/tests/CMakeLists.txt
    qpid/trunk/qpid/specs/management-schema.xml
    qpid/trunk/qpid/tools/src/py/qpidtoollibs/broker.py

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1485836&r1=1485835&r2=1485836&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Thu May 23 19:38:09 2013
@@ -52,10 +52,13 @@
 #include "qmf/org/apache/qpid/broker/ArgsBrokerSetLogHiresTimestamp.h"
 #include "qmf/org/apache/qpid/broker/ArgsBrokerSetTimestampConfig.h"
 #include "qmf/org/apache/qpid/broker/ArgsBrokerGetTimestampConfig.h"
+#include "qmf/org/apache/qpid/broker/ArgsBrokerQueueRedirect.h"
 #include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
 #include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
 #include "qmf/org/apache/qpid/broker/EventBind.h"
 #include "qmf/org/apache/qpid/broker/EventUnbind.h"
+#include "qmf/org/apache/qpid/broker/EventQueueRedirect.h"
+#include "qmf/org/apache/qpid/broker/EventQueueRedirectCancelled.h"
 #include "qpid/amqp_0_10/Codecs.h"
 #include "qpid/management/ManagementDirectExchange.h"
 #include "qpid/management/ManagementTopicExchange.h"
@@ -576,6 +579,14 @@ Manageable::status_t Broker::ManagementM
         status = Manageable::STATUS_OK;
         break;
     }
+    case _qmf::Broker::METHOD_QUEUEREDIRECT:
+    {
+        string srcQueue(dynamic_cast<_qmf::ArgsBrokerQueueRedirect&>(args).i_sourceQueue);
+        string tgtQueue(dynamic_cast<_qmf::ArgsBrokerQueueRedirect&>(args).i_targetQueue);
+        QPID_LOG (debug, "Broker::queueRedirect source queue:" << srcQueue <<
" to target queue " << tgtQueue);
+        status =  queueRedirect(srcQueue, tgtQueue);
+        break;
+    }
     default:
         QPID_LOG (debug, "Broker ManagementMethod not implemented: id=" << methodId
<< "]");
         status = Manageable::STATUS_NOT_IMPLEMENTED;
@@ -1046,6 +1057,120 @@ bool Broker::getLogHiresTimestamp()
 }
 
 
+Manageable::status_t Broker::queueRedirect(const std::string& srcQueue,
+                                           const std::string& tgtQueue)
+{
+    Queue::shared_ptr srcQ(queues.find(srcQueue));
+    if (!srcQ) {
+        QPID_LOG(error, "Queue redirect failed: source queue not found: "
+            << srcQueue);
+        return Manageable::STATUS_UNKNOWN_OBJECT;
+    }
+
+    if (!tgtQueue.empty()) {
+        // NonBlank target queue creates partnership
+        Queue::shared_ptr tgtQ(queues.find(tgtQueue));
+        if (!tgtQ) {
+            QPID_LOG(error, "Queue redirect failed: target queue not found: "
+                << tgtQueue);
+            return Manageable::STATUS_UNKNOWN_OBJECT;
+        }
+
+        if (srcQueue.compare(tgtQueue) == 0) {
+            QPID_LOG(error, "Queue redirect source queue: "
+                << tgtQueue << " cannot be its own target");
+            return Manageable::STATUS_USER;
+        }
+
+        if (srcQ->isAutoDelete()) {
+            QPID_LOG(error, "Queue redirect source queue: "
+                << srcQueue << " is autodelete and can not be part of redirect");
+            return Manageable::STATUS_USER;
+        }
+
+        if (tgtQ->isAutoDelete()) {
+            QPID_LOG(error, "Queue redirect target queue: "
+                << tgtQueue << " is autodelete and can not be part of redirect");
+            return Manageable::STATUS_USER;
+        }
+
+        if (srcQ->getRedirectPeer()) {
+            QPID_LOG(error, "Queue redirect source queue: "
+                << srcQueue << " is already redirected");
+            return Manageable::STATUS_USER;
+        }
+
+        if (tgtQ->getRedirectPeer()) {
+            QPID_LOG(error, "Queue redirect target queue: "
+                << tgtQueue << " is already redirected");
+            return Manageable::STATUS_USER;
+        }
+
+        // Start the backup overflow partnership
+        srcQ->setRedirectPeer(tgtQ, true);
+        tgtQ->setRedirectPeer(srcQ, false);
+
+        // Set management state
+        srcQ->setMgmtRedirectState(tgtQueue, true, true);
+        tgtQ->setMgmtRedirectState(srcQueue, true, false);
+
+        // Management event
+        if (managementAgent.get()) {
+            managementAgent->raiseEvent(_qmf::EventQueueRedirect(srcQueue, tgtQueue));
+        }
+
+        QPID_LOG(info, "Queue redirect complete. queue: "
+            << srcQueue << " target queue: " << tgtQueue);
+        return Manageable::STATUS_OK;
+    } else {
+        // Blank target queue destroys partnership
+        Queue::shared_ptr tgtQ(srcQ->getRedirectPeer());
+        if (!tgtQ) {
+            QPID_LOG(error, "Queue redirect source queue: "
+                << srcQueue << " is not in redirected");
+            return Manageable::STATUS_USER;
+        }
+
+        if (!srcQ->isRedirectSource()) {
+            QPID_LOG(error, "Queue redirect source queue: "
+                << srcQueue << " is not a redirect source");
+            return Manageable::STATUS_USER;
+        }
+
+        queueRedirectDestroy(srcQ, tgtQ, true);
+
+        return Manageable::STATUS_OK;
+    }
+}
+
+
+void Broker::queueRedirectDestroy(Queue::shared_ptr srcQ,
+                                  Queue::shared_ptr tgtQ,
+                                  bool moveMsgs) {
+    QPID_LOG(notice, "Queue redirect destroyed. queue: " << srcQ->getName()
+        << " target queue: " << tgtQ->getName());
+
+    tgtQ->setMgmtRedirectState(empty, false, false);
+    srcQ->setMgmtRedirectState(empty, false, false);
+
+    if (moveMsgs) {
+        // TODO: this 'move' works in the static case but has no
+        // actual locking that does what redirect needs when
+        // there is a lot of traffic in flight.
+        tgtQ->move(srcQ, 0);
+    }
+
+    Queue::shared_ptr np;
+
+    tgtQ->setRedirectPeer(np, false);
+    srcQ->setRedirectPeer(np, false);
+
+    if (managementAgent.get()) {
+        managementAgent->raiseEvent(_qmf::EventQueueRedirectCancelled(srcQ->getName(),
tgtQ->getName()));
+    }
+}
+
+
 const Broker::TransportInfo& Broker::getTransportInfo(const std::string& name) const
{
     static TransportInfo nullTransportInfo;
     TransportMap::const_iterator i
@@ -1135,7 +1260,6 @@ bool Broker::deferDeliveryImpl(const std
 
 const std::string Broker::TCP_TRANSPORT("tcp");
 
-
 std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue(
     const std::string& name,
     const QueueSettings& settings,
@@ -1210,6 +1334,11 @@ void Broker::deleteQueue(const std::stri
         if (check) check(queue);
         if (acl)
             acl->recordDestroyQueue(name);
+        Queue::shared_ptr peerQ(queue->getRedirectPeer());
+        if (peerQ)
+            queueRedirectDestroy(queue->isRedirectSource() ? queue : peerQ,
+                                 queue->isRedirectSource() ? peerQ : queue,
+                                 false);
         queues.destroy(name, connectionId, userId);
         queue->destroyed();
     } else {

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=1485836&r1=1485835&r2=1485836&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Thu May 23 19:38:09 2013
@@ -165,6 +165,8 @@ class Broker : public sys::Runnable, pub
                                             const ConnectionState* context);
     Manageable::status_t setTimestampConfig(const bool receive,
                                             const ConnectionState* context);
+    Manageable::status_t queueRedirect(const std::string& srcQueue, const std::string&
tgtQueue);
+    void queueRedirectDestroy(boost::shared_ptr<Queue> srcQ, boost::shared_ptr<Queue>
tgtQ, bool moveMsgs);
     boost::shared_ptr<sys::Poller> poller;
     std::auto_ptr<sys::Timer> timer;
     Options config;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1485836&r1=1485835&r2=1485836&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Thu May 23 19:38:09 2013
@@ -138,6 +138,7 @@ QueueSettings merge(const QueueSettings&
 
 }
 
+
 Queue::TxPublish::TxPublish(const Message& m, boost::shared_ptr<Queue> q) : message(m),
queue(q), prepared(false) {}
 bool Queue::TxPublish::prepare(TransactionContext* ctxt) throw()
 {
@@ -186,7 +187,8 @@ Queue::Queue(const string& _name, const 
     broker(b),
     deleted(false),
     barrier(*this),
-    allocator(new FifoDistributor( *messages ))
+    allocator(new FifoDistributor( *messages )),
+    redirectSource(false)
 {
     if (settings.maxDepth.hasCount()) current.setCount(0);
     if (settings.maxDepth.hasSize()) current.setSize(0);
@@ -267,6 +269,15 @@ bool Queue::accept(const Message& msg)
 
 void Queue::deliver(Message msg, TxBuffer* txn)
 {
+    if (redirectPeer) {
+        redirectPeer->deliverTo(msg, txn);
+    } else {
+        deliverTo(msg, txn);
+    }
+}
+
+void Queue::deliverTo(Message msg, TxBuffer* txn)
+{
     if (accept(msg)) {
         if (txn) {
             TxOp::shared_ptr op(new TxPublish(msg, shared_from_this()));
@@ -1123,6 +1134,7 @@ void Queue::unbind(ExchangeRegistry& exc
     bindings.unbind(exchanges, shared_from_this());
 }
 
+
 uint64_t Queue::getPersistenceId() const
 {
     return persistenceId;
@@ -1626,5 +1638,19 @@ void Queue::addArgument(const string& ke
     if (mgmtObject != 0) mgmtObject->set_arguments(settings.asMap());
 }
 
+
+void Queue::setRedirectPeer ( Queue::shared_ptr peer, bool isSrc) {
+    Mutex::ScopedLock locker(messageLock);
+    redirectPeer = peer;
+    redirectSource = isSrc;
+}
+
+void Queue::setMgmtRedirectState( std::string peer, bool enabled, bool isSrc ) {
+    if (mgmtObject != 0) {
+        mgmtObject->set_redirectPeer(enabled ? peer : "");
+        mgmtObject->set_redirectSource(isSrc);
+    }
+}
+
 }}
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=1485836&r1=1485835&r2=1485836&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Thu May 23 19:38:09 2013
@@ -82,6 +82,9 @@ class Queue : public boost::enable_share
               public PersistableQueue, public management::Manageable {
   public:
     typedef boost::function1<bool, const Message&> MessagePredicate;
+
+    typedef boost::shared_ptr<Queue> shared_ptr;
+
   protected:
     struct UsageBarrier
     {
@@ -169,6 +172,10 @@ class Queue : public boost::enable_share
     boost::shared_ptr<MessageDistributor> allocator;
     boost::scoped_ptr<Selector> selector;
 
+    // Redirect source and target refer to each other. Only one is source.
+    Queue::shared_ptr redirectPeer;
+    bool redirectSource;
+
     virtual void push(Message& msg, bool isRecovery=false);
     bool accept(const Message&);
     void process(Message& msg);
@@ -202,8 +209,6 @@ class Queue : public boost::enable_share
 
   public:
 
-    typedef boost::shared_ptr<Queue> shared_ptr;
-
     typedef std::vector<shared_ptr> vector;
 
     QPID_BROKER_EXTERN Queue(const std::string& name,
@@ -250,10 +255,16 @@ class Queue : public boost::enable_share
     QPID_BROKER_EXTERN bool dequeueMessageAt(const qpid::framing::SequenceNumber& position);
 
     /**
+     * Delivers a message to the queue or to overflow partner.
+     */
+    QPID_BROKER_EXTERN void deliver(Message, TxBuffer* = 0);
+    /**
      * Delivers a message to the queue. Will record it as
      * enqueued if persistent then process it.
      */
-    QPID_BROKER_EXTERN void deliver(Message, TxBuffer* = 0);
+  private:
+    QPID_BROKER_EXTERN void deliverTo(Message, TxBuffer* = 0);
+  public:
     /**
      * Returns a message to the in-memory queue (due to lack
      * of acknowledegement from a receiver). If a consumer is
@@ -428,6 +439,14 @@ class Queue : public boost::enable_share
     /** Add an argument to be included in management messages about this queue. */
     QPID_BROKER_EXTERN void addArgument(const std::string& key, const types::Variant&
value);
 
+    /**
+     * Atomic Redirect
+     */
+    QPID_BROKER_EXTERN void setRedirectPeer ( Queue::shared_ptr peer, bool isSrc );
+    QPID_BROKER_EXTERN Queue::shared_ptr getRedirectPeer() { return redirectPeer; }
+    QPID_BROKER_EXTERN bool isRedirectSource() const { return redirectSource; }
+    QPID_BROKER_EXTERN void setMgmtRedirectState( std::string peer, bool enabled, bool isSrc
);
+
   friend class QueueFactory;
 };
 }

Modified: qpid/trunk/qpid/cpp/src/tests/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/CMakeLists.txt?rev=1485836&r1=1485835&r2=1485836&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/CMakeLists.txt (original)
+++ qpid/trunk/qpid/cpp/src/tests/CMakeLists.txt Thu May 23 19:38:09 2013
@@ -349,6 +349,7 @@ endif (BUILD_MSSQL)
 if (BUILD_MSCLFS)
   add_test (store_tests_clfs ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/run_store_tests${test_script_suffix}
MSSQL-CLFS)
 endif (BUILD_MSCLFS)
+add_test (queue_redirect ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/run_queue_redirect${test_script_suffix})
 endif (PYTHON_EXECUTABLE)
 
 add_library(test_store MODULE test_store.cpp)

Modified: qpid/trunk/qpid/specs/management-schema.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/specs/management-schema.xml?rev=1485836&r1=1485835&r2=1485836&view=diff
==============================================================================
--- qpid/trunk/qpid/specs/management-schema.xml (original)
+++ qpid/trunk/qpid/specs/management-schema.xml Thu May 23 19:38:09 2013
@@ -189,6 +189,10 @@
       <arg name="logHires" dir="I" type="bool"  desc="True to enable enable high resolution
timestamp in logs."/>
     </method>
 
+    <method name="queueRedirect" desc="Enable/disable delivery redirect for indicated
queues">
+        <arg name="sourceQueue" dir="I" type="sstr"   desc="Source queue."/>
+        <arg name="targetQueue" dir="I" type="sstr"   desc="Redirect target queue. Blank
disables redirect."/>
+    </method>
 
   </class>
 
@@ -278,6 +282,9 @@
     <statistic name="flowStopped"         type="bool"     desc="Flow control active."/>
     <statistic name="flowStoppedCount"    type="count32"  desc="Number of times flow control
was activated for this queue"/>
 
+   <statistic name="redirectPeer"         type="sstr"     desc="Partner queue for redirected
pair"/>
+   <statistic name="redirectSource"       type="bool"     desc="This queue is the redirect
source"/>
+
     <method name="purge" desc="Discard all or some messages on a queue">
       <arg name="request" dir="I" type="uint32" desc="0 for all messages or n>0 for
n messages"/>
       <arg name="filter"  dir="I" type="map"    desc="if specified, purge only those messages
matching this filter"/>
@@ -546,6 +553,7 @@
     <arg name="reason"  type="lstr"   desc="Reason for a failure"/>
     <arg name="rhost"   type="sstr"   desc="Address (i.e. DNS name, IP address, etc.)
of a remotely connected host"/>
     <arg name="user"    type="sstr"   desc="Authentication identity"/>
+    <arg name="qTarget" type="sstr"   desc="Redirect target queue"/>
     <arg name="msgDepth" type="count64" desc="Current size of queue in messages"/>
     <arg name="byteDepth" type="count64" desc="Current size of queue in bytes"/>
     <arg name="properties" type="map" desc="optional identifying information sent by the
remote"/>
@@ -566,6 +574,8 @@
   <event name="unsubscribe"       sev="inform" args="rhost, user, dest"/>
   <event name="queueThresholdCrossedUpward"   sev="inform" args="qName, msgDepth, byteDepth"/>
   <event name="queueThresholdCrossedDownward" sev="inform" args="qName, msgDepth, byteDepth"/>
+  <event name="queueRedirect"          sev="inform" args="qName, qTarget"/>
+  <event name="queueRedirectCancelled" sev="inform" args="qName, qTarget"/>
 
   <!-- The following are deprecated -->
   <event name="queueThresholdExceeded" sev="warn" args="qName, msgDepth, byteDepth"/>

Modified: qpid/trunk/qpid/tools/src/py/qpidtoollibs/broker.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qpidtoollibs/broker.py?rev=1485836&r1=1485835&r2=1485836&view=diff
==============================================================================
--- qpid/trunk/qpid/tools/src/py/qpidtoollibs/broker.py (original)
+++ qpid/trunk/qpid/tools/src/py/qpidtoollibs/broker.py Thu May 23 19:38:09 2013
@@ -292,6 +292,11 @@ class BrokerAgent(object):
             'routingKey':   key}
     return self._method('LookupPublish', args, "org.apache.qpid.acl:acl:org.apache.qpid.broker:broker:amqp-broker")
 
+  def Redirect(self, sourceQueue, targetQueue):
+    args = {'sourceQueue': sourceQueue,
+            'targetQueue': targetQueue}
+    return self._method('queueRedirect', args, "org.apache.qpid.broker:broker:amqp-broker")
+
   def create(self, _type, name, properties={}, strict=False):
     """Create an object of the specified type"""
     args = {'type': _type,



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message