qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r1485511 - in /qpid/trunk/qpid/cpp/src: ./ qpid/ha/ tests/
Date Wed, 22 May 2013 23:52:17 GMT
Author: aconway
Date: Wed May 22 23:52:17 2013
New Revision: 1485511

URL: http://svn.apache.org/r1485511
Log:
QPID-4866: HA support for failover exchange

Add support for the "amq.failover" exchange with new HA, to support migration of
clients that used this facility with the old cluster.

Added:
    qpid/trunk/qpid/cpp/src/qpid/ha/FailoverExchange.cpp   (with props)
    qpid/trunk/qpid/cpp/src/qpid/ha/FailoverExchange.h   (with props)
    qpid/trunk/qpid/cpp/src/qpid/ha/makeMessage.cpp   (with props)
    qpid/trunk/qpid/cpp/src/qpid/ha/makeMessage.h   (with props)
Modified:
    qpid/trunk/qpid/cpp/src/CMakeLists.txt
    qpid/trunk/qpid/cpp/src/ha.mk
    qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h
    qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
    qpid/trunk/qpid/cpp/src/tests/ha_test.py
    qpid/trunk/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/trunk/qpid/cpp/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/CMakeLists.txt?rev=1485511&r1=1485510&r2=1485511&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/CMakeLists.txt (original)
+++ qpid/trunk/qpid/cpp/src/CMakeLists.txt Wed May 22 23:52:17 2013
@@ -642,9 +642,13 @@ if (BUILD_HA)
         qpid/ha/BrokerReplicator.h
         qpid/ha/ConnectionObserver.cpp
         qpid/ha/ConnectionObserver.h
+        qpid/ha/FailoverExchange.cpp
+        qpid/ha/FailoverExchange.h
         qpid/ha/HaBroker.cpp
         qpid/ha/HaBroker.h
         qpid/ha/HaPlugin.cpp
+	qpid/ha/makeMessage.cpp
+	qpid/ha/makeMessage.h
         qpid/ha/Membership.cpp
         qpid/ha/Membership.h
         qpid/ha/Primary.cpp

Modified: qpid/trunk/qpid/cpp/src/ha.mk
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/ha.mk?rev=1485511&r1=1485510&r2=1485511&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/ha.mk (original)
+++ qpid/trunk/qpid/cpp/src/ha.mk Wed May 22 23:52:17 2013
@@ -36,6 +36,8 @@ ha_la_SOURCES =					\
   qpid/ha/HaBroker.cpp				\
   qpid/ha/HaBroker.h				\
   qpid/ha/HaPlugin.cpp				\
+  qpid/ha/makeMessage.cpp			\
+  qpid/ha/makeMessage.h				\
   qpid/ha/Membership.cpp			\
   qpid/ha/Membership.h				\
   qpid/ha/Primary.cpp				\

Added: qpid/trunk/qpid/cpp/src/qpid/ha/FailoverExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/FailoverExchange.cpp?rev=1485511&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/FailoverExchange.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/FailoverExchange.cpp Wed May 22 23:52:17 2013
@@ -0,0 +1,129 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "FailoverExchange.h"
+#include "makeMessage.h"
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/DeliverableMessage.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/framing/MessageProperties.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/AMQHeaderBody.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/log/Statement.h"
+#include "qpid/framing/Array.h"
+#include "qpid/RefCounted.h"
+#include "qpid/UrlArray.h"
+#include <boost/bind.hpp>
+#include <algorithm>
+
+
+namespace qpid {
+namespace ha {
+
+using namespace std;
+
+using namespace broker;
+using namespace framing;
+using broker::amqp_0_10::MessageTransfer;
+
+const string FailoverExchange::typeName("amq.failover");
+
+namespace {
+struct OstreamUrls {
+    OstreamUrls(const FailoverExchange::Urls& u) : urls(u) {}
+    FailoverExchange::Urls urls;
+};
+
+ostream& operator<<(ostream& o, const OstreamUrls& urls) {
+    ostream_iterator<qpid::Url> out(o, " ");
+    copy(urls.urls.begin(), urls.urls.end(), out);
+    return o;
+}
+}
+
+FailoverExchange::FailoverExchange(management::Manageable& parent, Broker& b)
+    : Exchange(typeName, &parent, &b)
+{
+    QPID_LOG(debug, typeName << " created.");
+    if (mgmtExchange != 0)
+        mgmtExchange->set_type(typeName);
+}
+
+void FailoverExchange::setUrls(const vector<Url>& u) {
+    QPID_LOG(debug, typeName << " URLs set to " << OstreamUrls(u));
+    Lock l(lock);
+    urls = u;
+}
+
+void FailoverExchange::updateUrls(const vector<Url>& u) {
+    QPID_LOG(debug, typeName << " Updating URLs " << OstreamUrls(u) <<
" to "
+             << queues.size() << " subscribers.");
+    Lock l(lock);
+    urls=u;
+    if (!urls.empty() && !queues.empty()) {
+        for (Queues::const_iterator i = queues.begin(); i != queues.end(); ++i)
+            sendUpdate(*i, l);
+    }
+}
+
+string FailoverExchange::getType() const { return typeName; }
+
+bool FailoverExchange::bind(Queue::shared_ptr queue, const string&,
+                            const framing::FieldTable*) {
+    QPID_LOG(debug, typeName << " binding " << queue->getName());
+    Lock l(lock);
+    sendUpdate(queue, l);
+    return queues.insert(queue).second;
+}
+
+bool FailoverExchange::unbind(Queue::shared_ptr queue, const string&,
+                              const framing::FieldTable*) {
+    QPID_LOG(debug, typeName << " un-binding " << queue->getName());
+    Lock l(lock);
+    return queues.erase(queue);
+}
+
+bool FailoverExchange::isBound(Queue::shared_ptr queue, const string* const,
+                               const framing::FieldTable*) {
+    Lock l(lock);
+    return queues.find(queue) != queues.end();
+}
+
+void FailoverExchange::route(Deliverable&) {
+    QPID_LOG(warning, "Message received by exchange " << typeName << " ignoring");
+}
+
+void FailoverExchange::sendUpdate(const Queue::shared_ptr& queue, sys::Mutex::ScopedLock&)
{
+    QPID_LOG(debug, typeName << " sending " << OstreamUrls(urls) << " to
" << queue->getName());
+    if (urls.empty()) return;
+    framing::Array array = vectorToUrlArray(urls);
+    const ProtocolVersion v;
+    broker::Message message(makeMessage(Buffer(), typeName));
+    MessageTransfer& transfer = MessageTransfer::get(message);
+    MessageProperties* props =
+        transfer.getFrames().getHeaders()->get<framing::MessageProperties>(true);
+    props->setContentLength(0);
+    props->getApplicationHeaders().setArray(typeName, array);
+    DeliverableMessage(message, 0).deliverTo(queue);
+}
+
+}} // namespace ha

Propchange: qpid/trunk/qpid/cpp/src/qpid/ha/FailoverExchange.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/ha/FailoverExchange.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/trunk/qpid/cpp/src/qpid/ha/FailoverExchange.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/FailoverExchange.h?rev=1485511&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/FailoverExchange.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/FailoverExchange.h Wed May 22 23:52:17 2013
@@ -0,0 +1,71 @@
+#ifndef QPID_HA_FAILOVEREXCHANGE_H
+#define QPID_HA_FAILOVEREXCHANGE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *ls
+ */
+
+#include "qpid/broker/Exchange.h"
+#include "qpid/broker/DeliverableMessage.h"
+#include "qpid/Url.h"
+
+#include <vector>
+#include <set>
+
+namespace qpid {
+namespace ha {
+
+/**
+ * Failover exchange provides failover host list, as specified in AMQP 0-10.
+ */
+class FailoverExchange : public broker::Exchange
+{
+  public:
+    typedef std::vector<Url> Urls;
+
+    static const std::string typeName;
+
+    FailoverExchange(management::Manageable& parent, broker::Broker& b);
+
+    /** Set the URLs but don't send an update.*/
+    void setUrls(const Urls&);
+    /** Set the URLs and send an update.*/
+    void updateUrls(const Urls&);
+
+    // Exchange overrides
+    std::string getType() const;
+    bool bind(boost::shared_ptr<broker::Queue> queue, const std::string& routingKey,
const framing::FieldTable* args);
+    bool unbind(boost::shared_ptr<broker::Queue> queue, const std::string& routingKey,
const framing::FieldTable* args);
+    bool isBound(boost::shared_ptr<broker::Queue> queue, const std::string* const routingKey,
const framing::FieldTable* const args);
+    void route(broker::Deliverable& msg);
+
+  private:
+    void sendUpdate(const boost::shared_ptr<broker::Queue>&, sys::Mutex::ScopedLock&);
+
+    typedef sys::Mutex::ScopedLock Lock;
+    typedef std::set<boost::shared_ptr<broker::Queue> > Queues;
+
+    sys::Mutex lock;
+    Urls urls;
+    Queues queues;
+};
+}} // namespace qpid::ha
+
+#endif  /*!QPID_HA_FAILOVEREXCHANGE_H*/

Propchange: qpid/trunk/qpid/cpp/src/qpid/ha/FailoverExchange.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/ha/FailoverExchange.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1485511&r1=1485510&r2=1485511&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp Wed May 22 23:52:17 2013
@@ -64,7 +64,8 @@ HaBroker::HaBroker(broker::Broker& b, co
       broker(b),
       observer(new ConnectionObserver(*this, systemId)),
       role(new StandAlone),
-      membership(BrokerInfo(systemId, STANDALONE), *this)
+      membership(BrokerInfo(systemId, STANDALONE), *this),
+      failoverExchange(new FailoverExchange(*b.GetVhostObject(), b))
 {
     // If we are joining a cluster we must start excluding clients now,
     // otherwise there's a window for a client to connect before we get to
@@ -74,6 +75,7 @@ HaBroker::HaBroker(broker::Broker& b, co
         shared_ptr<broker::ConnectionObserver> excluder(new BackupConnectionExcluder);
         observer->setObserver(excluder, "Backup: ");
         broker.getConnectionObservers().add(observer);
+        broker.getExchanges().registerExchange(failoverExchange);
     }
 }
 
@@ -171,7 +173,9 @@ void HaBroker::setPublicUrl(const Url& u
     mgmtObject->set_publicUrl(url.str());
     knownBrokers.clear();
     knownBrokers.push_back(url);
-    QPID_LOG(debug, role->getLogPrefix() << "Setting public URL to: " << url);
+    vector<Url> urls(1, url);
+    failoverExchange->updateUrls(urls);
+    QPID_LOG(debug, role->getLogPrefix() << "Public URL set to: " << url);
 }
 
 void HaBroker::setBrokerUrl(const Url& url) {

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h?rev=1485511&r1=1485510&r2=1485511&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h Wed May 22 23:52:17 2013
@@ -27,6 +27,7 @@
 #include "types.h"
 #include "Settings.h"
 #include "qpid/Url.h"
+#include "FailoverExchange.h"
 #include "qpid/sys/Mutex.h"
 #include "qmf/org/apache/qpid/ha/HaBroker.h"
 #include "qpid/management/Manageable.h"
@@ -115,6 +116,7 @@ class HaBroker : public management::Mana
     boost::shared_ptr<ConnectionObserver> observer; // Used by Backup and Primary
     boost::shared_ptr<Role> role;
     Membership membership;
+    boost::shared_ptr<FailoverExchange> failoverExchange;
 };
 }} // namespace qpid::ha
 

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1485511&r1=1485510&r2=1485511&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp Wed May 22 23:52:17 2013
@@ -19,6 +19,7 @@
  *
  */
 
+#include "makeMessage.h"
 #include "QueueGuard.h"
 #include "QueueRange.h"
 #include "QueueReplicator.h"
@@ -41,6 +42,7 @@ using namespace framing;
 using namespace broker;
 using namespace std;
 using sys::Mutex;
+using broker::amqp_0_10::MessageTransfer;
 
 const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.ha-replicating-subscription");
 const string ReplicatingSubscription::QPID_BACK("qpid.ha-back");
@@ -289,7 +291,7 @@ void ReplicatingSubscription::acknowledg
 }
 
 // Called with lock held. Called in subscription's connection thread.
-void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock&)
+void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock& l)
 {
     if (dequeues.empty()) return;
     QPID_LOG(trace, logPrefix << "Sending dequeues " << dequeues);
@@ -300,7 +302,7 @@ void ReplicatingSubscription::sendDequeu
     buffer.reset();
     {
         Mutex::ScopedUnlock u(lock);
-        sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer);
+        sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer, l);
     }
 }
 
@@ -328,7 +330,7 @@ void ReplicatingSubscription::dequeued(S
 }
 
 // Called with lock held. Called in subscription's connection thread.
-void ReplicatingSubscription::sendPositionEvent(SequenceNumber pos, Mutex::ScopedLock&)
+void ReplicatingSubscription::sendPositionEvent(SequenceNumber pos, Mutex::ScopedLock&
l)
 {
     if (pos == backupPosition) return; // No need to send.
     QPID_LOG(trace, logPrefix << "Sending position " << pos << ", was "
<< backupPosition);
@@ -338,39 +340,25 @@ void ReplicatingSubscription::sendPositi
     buffer.reset();
     {
         Mutex::ScopedUnlock u(lock);
-        sendEvent(QueueReplicator::POSITION_EVENT_KEY, buffer);
+        sendEvent(QueueReplicator::POSITION_EVENT_KEY, buffer, l);
     }
 }
 
-void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer&
buffer)
+void ReplicatingSubscription::sendEvent(const std::string& key,
+                                        const framing::Buffer& buffer,
+                                        Mutex::ScopedLock&)
 {
-    //generate event message
-    boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> event(new qpid::broker::amqp_0_10::MessageTransfer());
-    AMQFrame method((MessageTransferBody(ProtocolVersion(), string(), 0, 0)));
-    AMQFrame header((AMQHeaderBody()));
-    AMQFrame content((AMQContentBody()));
-    content.castBody<AMQContentBody>()->decode(buffer, buffer.getSize());
-    header.setBof(false);
-    header.setEof(false);
-    header.setBos(true);
-    header.setEos(true);
-    content.setBof(false);
-    content.setEof(true);
-    content.setBos(true);
-    content.setEos(true);
-    event->getFrames().append(method);
-    event->getFrames().append(header);
-    event->getFrames().append(content);
-
+    broker::Message message = makeMessage(buffer);
+    MessageTransfer& transfer = MessageTransfer::get(message);
     DeliveryProperties* props =
-        event->getFrames().getHeaders()->get<DeliveryProperties>(true);
+        transfer.getFrames().getHeaders()->get<DeliveryProperties>(true);
     props->setRoutingKey(key);
-    // Send the event directly to the base consumer implementation.
-    //dummy consumer prevents acknowledgements being handled, which is what we want for events
-    ConsumerImpl::deliver(QueueCursor(), Message(event, 0), boost::shared_ptr<Consumer>());
+    // Send the event directly to the base consumer implementation.  The dummy
+    // consumer prevents acknowledgements being handled, which is what we want
+    // for events
+    ConsumerImpl::deliver(QueueCursor(), message, boost::shared_ptr<Consumer>());
 }
 
-
 // Called in subscription's connection thread.
 bool ReplicatingSubscription::doDispatch()
 {

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h?rev=1485511&r1=1485510&r2=1485511&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h Wed May 22 23:52:17 2013
@@ -132,7 +132,8 @@ class ReplicatingSubscription : public b
     void sendDequeueEvent(sys::Mutex::ScopedLock&);
     void sendPositionEvent(framing::SequenceNumber, sys::Mutex::ScopedLock&);
     void setReady();
-    void sendEvent(const std::string& key, framing::Buffer&);
+    void sendEvent(const std::string& key, const framing::Buffer&,
+                   sys::Mutex::ScopedLock&);
   friend struct Factory;
 };
 

Added: qpid/trunk/qpid/cpp/src/qpid/ha/makeMessage.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/makeMessage.cpp?rev=1485511&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/makeMessage.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/makeMessage.cpp Wed May 22 23:52:17 2013
@@ -0,0 +1,57 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "makeMessage.h"
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/MessageTransferBody.h"
+
+namespace qpid {
+namespace ha {
+
+broker::Message makeMessage(const framing::Buffer& buffer,
+                            const std::string& destination)
+{
+    using namespace framing;
+    using broker::amqp_0_10::MessageTransfer;
+
+    boost::intrusive_ptr<MessageTransfer> transfer(
+        new qpid::broker::amqp_0_10::MessageTransfer());
+    AMQFrame method((MessageTransferBody(ProtocolVersion(), destination, 0, 0)));
+    AMQFrame header((AMQHeaderBody()));
+    AMQFrame content((AMQContentBody()));
+    // AMQContentBody::decode is missing a const declaration, so cast it here.
+    content.castBody<AMQContentBody>()->decode(
+        const_cast<Buffer&>(buffer), buffer.getSize());
+    header.setBof(false);
+    header.setEof(false);
+    header.setBos(true);
+    header.setEos(true);
+    content.setBof(false);
+    content.setEof(true);
+    content.setBos(true);
+    content.setEos(true);
+    transfer->getFrames().append(method);
+    transfer->getFrames().append(header);
+    transfer->getFrames().append(content);
+    return broker::Message(transfer, 0);
+}
+
+}} // namespace qpid::ha

Propchange: qpid/trunk/qpid/cpp/src/qpid/ha/makeMessage.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/ha/makeMessage.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/trunk/qpid/cpp/src/qpid/ha/makeMessage.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/makeMessage.h?rev=1485511&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/makeMessage.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/makeMessage.h Wed May 22 23:52:17 2013
@@ -0,0 +1,43 @@
+#ifndef QPID_HA_MAKEMESSAGE_H
+#define QPID_HA_MAKEMESSAGE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/broker/Message.h"
+
+namespace qpid {
+namespace framing {
+class Buffer;
+}
+namespace ha {
+
+/**
+ * Create internal messages used by HA components.
+ */
+broker::Message makeMessage(
+    const framing::Buffer& content,
+    const std::string& destination=std::string()
+);
+
+}} // namespace qpid::ha
+
+#endif  /*!QPID_HA_MAKEMESSAGE_H*/

Propchange: qpid/trunk/qpid/cpp/src/qpid/ha/makeMessage.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/ha/makeMessage.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/trunk/qpid/cpp/src/tests/ha_test.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_test.py?rev=1485511&r1=1485510&r2=1485511&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_test.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_test.py Wed May 22 23:52:17 2013
@@ -151,9 +151,8 @@ acl allow all all
 
     def promote(self): self.ready(); self.qpid_ha(["promote"])
     def set_public_url(self, url): self.qpid_ha(["set", "--public-url", url])
-    def set_brokers_url(self, url): self.qpid_ha(["set", "--brokers-url", url])
+    def set_brokers_url(self, url): self.qpid_ha(["set", "--brokers-url", url]);
     def replicate(self, from_broker, queue): self.qpid_ha(["replicate", from_broker, queue])
-
     def agent(self):
         if not self._agent:
             cred = self.client_credentials
@@ -191,7 +190,7 @@ acl allow all all
         agent = self.agent()
         assert retry(lambda: agent.getQueue(queue) is None, timeout=timeout)
 
-    # FIXME aconway 2012-05-01: do direct python call to qpid-config code.
+    # TODO aconway 2012-05-01: do direct python call to qpid-config code.
     def qpid_config(self, args):
         assert subprocess.call(
             [self.qpid_config_path, "--broker", self.host_port()]+args,
@@ -299,7 +298,7 @@ class HaCluster(object):
         """Start a new broker in the cluster"""
         i = len(self)
         assert i <= len(self._ports)
-        if i == len(self._ports):
+        if i == len(self._ports): # Adding new broker after cluster init
             self._ports.append(HaPort(self.test))
             self._set_url()
             self._update_urls()
@@ -311,10 +310,9 @@ class HaCluster(object):
         self.url = ",".join("127.0.0.1:%s"%(p.port) for p in self._ports)
 
     def _update_urls(self):
-        if len(self) > 1: # No failover addresses on a 1 cluster.
-            for b in self:
-                b.set_brokers_url(self.url)
-                b.set_public_url(self.url)
+        for b in self:
+            b.set_brokers_url(self.url)
+            b.set_public_url(self.url)
 
     def connect(self, i):
         """Connect with reconnect_urls"""

Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1485511&r1=1485510&r2=1485511&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Wed May 22 23:52:17 2013
@@ -549,7 +549,7 @@ class ReplicationTests(HaBrokerTest):
 
     def test_auth(self):
         """Verify that authentication does not interfere with replication."""
-        # FIXME aconway 2012-07-09: generate test sasl config portably for cmake
+        # TODO aconway 2012-07-09: generate test sasl config portably for cmake
         sasl_config=os.path.join(self.rootdir, "sasl_config")
         if not os.path.exists(sasl_config):
             print "WARNING: Skipping test, SASL test configuration %s not found."%sasl_config
@@ -1183,6 +1183,24 @@ class ConfigurationTests(HaBrokerTest):
         b = start("none", "none")
         check(b, "", "")
 
+    def test_failover_exchange(self):
+        """Verify that the failover exchange correctly reports cluster membership"""
+
+        def strip_url(url): return re.sub('amqp:|tcp:', '', url)
+
+        def assert_url(m, url):
+            urls = m.properties['amq.failover']
+            self.assertEqual(1, len(urls))
+            self.assertEqual(strip_url(urls[0]), url)
+
+        cluster = HaCluster(self, 1, args=["--ha-public-url=foo:1234"])
+        r = cluster[0].connect().session().receiver("amq.failover")
+        assert_url(r.fetch(1), "foo:1234")
+        cluster[0].set_public_url("bar:1234")
+        assert_url(r.fetch(1), "bar:1234")
+        cluster[0].set_brokers_url(cluster.url+",xxx:1234")
+        self.assertRaises(Empty, r.fetch, 0) # Not updated for brokers URL
+
 class StoreTests(BrokerTest):
     """Test for HA with persistence."""
 
@@ -1203,8 +1221,7 @@ class StoreTests(BrokerTest):
         r = cluster[0].connect().session().receiver("qq")
         self.assertEqual(r.fetch().content, "foo")
         r.session.acknowledge()
-        # FIXME aconway 2012-09-21: sending this message is an ugly hack to flush
-        # the dequeue operation on qq.
+        # Sending this message is a hack to flush the dequeue operation on qq.
         s.send(Message("flush", durable=True))
 
         def verify(broker, x_count):



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message