qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r751760 - in /qpid/trunk/qpid/cpp: src/qpid/cluster/ src/qpid/framing/ src/tests/ xml/
Date Mon, 09 Mar 2009 17:03:41 GMT
Author: aconway
Date: Mon Mar  9 17:03:40 2009
New Revision: 751760

URL: http://svn.apache.org/viewvc?rev=751760&view=rev
Log:
Fix cluster TTL: replicte expiry information to newcomers.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h
    qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp
    qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h
    qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
    qpid/trunk/qpid/cpp/src/tests/start_cluster
    qpid/trunk/qpid/cpp/xml/cluster.xml

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=751760&r1=751759&r2=751760&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Mon Mar  9 17:03:40 2009
@@ -111,7 +111,6 @@
     decoder(boost::bind(&Cluster::deliverFrame, this, _1)),
     discarding(true),
     state(INIT),
-    frameId(0),
     lastSize(0),
     lastBroker(false)
 {
@@ -267,9 +266,6 @@
     }
     else if (state >= CATCHUP) {
         QPID_LOG(trace, *this << " DLVR:  " << e);
-        EventFrame ef(e);       // Non-const copy
-        if (ef.type == DATA)         // Add cluster-id to to data frames.
-            ef.frame.setClusterId(frameId++);  
         ConnectionPtr connection = getConnection(e.connectionId, l);
         if (connection)
             connection->deliveredFrame(e);
@@ -475,18 +471,16 @@
     cs.password = settings.password;
     cs.mechanism = settings.mechanism;
     updateThread = Thread(
-        new UpdateClient(self, updatee, url, broker, map, frameId, getConnections(l), decoder,
+        new UpdateClient(self, updatee, url, broker, map, *expiryPolicy, getConnections(l),
decoder,
                          boost::bind(&Cluster::updateOutDone, this),
                          boost::bind(&Cluster::updateOutError, this, _1),
                          cs));
 }
 
 // Called in update thread.
-void Cluster::updateInDone(const ClusterMap& m, uint64_t frameId_) {
+void Cluster::updateInDone(const ClusterMap& m) {
     Lock l(lock);
     updatedMap = m;
-    // Safe to set frameId here because we are stalled: deliveredFrame cannot be called concurrently.
-    frameId = frameId_;
     checkUpdateIn(l);
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=751760&r1=751759&r2=751760&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Mon Mar  9 17:03:40 2009
@@ -92,7 +92,7 @@
     void leave();
 
     // Update completed - called in update thread
-    void updateInDone(const ClusterMap&, uint64_t frameId);
+    void updateInDone(const ClusterMap&);
 
     MemberId getId() const;
     broker::Broker& getBroker() const;
@@ -108,6 +108,8 @@
     // Called only during update by Connection::shadowReady
     Decoder& getDecoder() { return decoder; }
 
+    ExpiryPolicy& getExpiryPolicy() { return *expiryPolicy; }
+    
   private:
     typedef sys::Monitor::ScopedLock Lock;
 
@@ -115,8 +117,6 @@
     typedef PollableQueue<EventFrame> PollableFrameQueue;
     typedef std::map<ConnectionId, ConnectionPtr> ConnectionMap;
 
-    // FIXME aconway 2009-03-07: sort functions by thread
-
     // NB: A dummy Lock& parameter marks functions that must only be
     // called with Cluster::lock  locked.
  
@@ -237,7 +237,6 @@
     } state;
 
     ConnectionMap connections;
-    uint64_t frameId;
     ClusterMap map;
     ClusterMap::Set elders;
     size_t lastSize;

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=751760&r1=751759&r2=751760&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Mon Mar  9 17:03:40 2009
@@ -279,9 +279,9 @@
     cluster.getDecoder().get(self).setFragment(fragment.data(), fragment.size());
 }
 
-void Connection::membership(const FieldTable& joiners, const FieldTable& members,
uint64_t frameId) {
+void Connection::membership(const FieldTable& joiners, const FieldTable& members)
{
     QPID_LOG(debug, cluster << " incoming update complete on connection " <<
*this);
-    cluster.updateInDone(ClusterMap(joiners, members), frameId);
+    cluster.updateInDone(ClusterMap(joiners, members));
     self.second = 0;        // Mark this as completed update connection.
 }
 
@@ -352,6 +352,10 @@
     q->setPosition(position);
 }
 
+void Connection::expiryId(uint64_t id) {
+    cluster.getExpiryPolicy().setId(id);
+}
+
 std::ostream& operator<<(std::ostream& o, const Connection& c) {
     const char* type="unknown";
     if (c.isLocal()) type = "local";

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=751760&r1=751759&r2=751760&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Mon Mar  9 17:03:40 2009
@@ -120,7 +120,7 @@
     
     void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username,
const std::string& fragment);
 
-    void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t
frameId);
+    void membership(const framing::FieldTable&, const framing::FieldTable&);
 
     void deliveryRecord(const std::string& queue,
                         const framing::SequenceNumber& position,
@@ -135,6 +135,7 @@
                         uint32_t credit);
 
     void queuePosition(const std::string&, const framing::SequenceNumber&);
+    void expiryId(uint64_t);
 
     void txStart();
     void txAccept(const framing::SequenceSet&);

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.cpp?rev=751760&r1=751759&r2=751760&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.cpp Mon Mar  9 17:03:40 2009
@@ -33,7 +33,9 @@
 }
 
 std::ostream& operator<<(std::ostream& o, const EventFrame& e) {
-    return o << e.frame  << " " << e.type << " " << e.connectionId
<< " read-credit=" << e.readCredit;
+    return o << e.frame  << " " << e.type << " " << e.connectionId;
+    if (e.readCredit) o << " read-credit=" << e.readCredit;
+    return o;
 }
 
 }} // namespace qpid::cluster

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp?rev=751760&r1=751759&r2=751760&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp Mon Mar  9 17:03:40 2009
@@ -31,46 +31,45 @@
 namespace cluster {
 
 ExpiryPolicy::ExpiryPolicy(Multicaster& m, const MemberId& id, broker::Timer&
t)
-    : expiredPolicy(new Expired), mcast(m), memberId(id), timer(t) {}
-
-namespace {
-uint64_t clusterId(const broker::Message& m) {
-    assert(m.getFrames().begin() != m.getFrames().end());
-    return m.getFrames().begin()->getClusterId();
-}
+    : expiryId(0), expiredPolicy(new Expired), mcast(m), memberId(id), timer(t) {}
 
 struct ExpiryTask : public broker::TimerTask {
     ExpiryTask(const boost::intrusive_ptr<ExpiryPolicy>& policy, uint64_t id, sys::AbsTime
when)
-        : TimerTask(when), expiryPolicy(policy), messageId(id) {}
-    void fire() { expiryPolicy->sendExpire(messageId); }
+        : TimerTask(when), expiryPolicy(policy), expiryId(id) {}
+    void fire() { expiryPolicy->sendExpire(expiryId); }
     boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
-    const uint64_t messageId;
+    const uint64_t expiryId;
 };
-}
 
 void ExpiryPolicy::willExpire(broker::Message& m) {
-    timer.add(new ExpiryTask(this, clusterId(m), m.getExpiration()));
+    uint64_t id = expiryId++;
+    assert(unexpiredById.find(id) == unexpiredById.end());
+    assert(unexpiredByMessage.find(&m) == unexpiredByMessage.end());
+    unexpiredById[id] = &m;
+    unexpiredByMessage[&m] = id;
+    timer.add(new ExpiryTask(this, id, m.getExpiration()));
 }
 
 bool ExpiryPolicy::hasExpired(broker::Message& m) {
-    sys::Mutex::ScopedLock l(lock);
-    IdSet::iterator i = expired.find(clusterId(m));
-    if (i != expired.end()) {
-        expired.erase(i);
-        const_cast<broker::Message&>(m).setExpiryPolicy(expiredPolicy); // hasExpired()
== true; 
-        return true;
-    }
-    return false;
+    return unexpiredByMessage.find(&m) == unexpiredByMessage.end();
 }
 
 void ExpiryPolicy::sendExpire(uint64_t id) {
-    sys::Mutex::ScopedLock l(lock);
     mcast.mcastControl(framing::ClusterMessageExpiredBody(framing::ProtocolVersion(), id),
memberId);
 }
 
 void ExpiryPolicy::deliverExpire(uint64_t id) {
-    sys::Mutex::ScopedLock l(lock);
-    expired.insert(id);
+    IdMessageMap::iterator i = unexpiredById.find(id);
+    if (i != unexpiredById.end()) {
+        i->second->setExpiryPolicy(expiredPolicy); // hasExpired() == true; 
+        unexpiredByMessage.erase(i->second);
+        unexpiredById.erase(i);
+    }
+}
+
+boost::optional<uint64_t> ExpiryPolicy::getId(broker::Message& m) {
+    MessageIdMap::iterator i = unexpiredByMessage.find(&m);
+    return i == unexpiredByMessage.end() ? boost::optional<uint64_t>() : i->second;
 }
 
 bool ExpiryPolicy::Expired::hasExpired(broker::Message&) { return true; }

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h?rev=751760&r1=751759&r2=751760&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h Mon Mar  9 17:03:40 2009
@@ -27,11 +27,15 @@
 #include "qpid/sys/Mutex.h"
 #include <boost/function.hpp>
 #include <boost/intrusive_ptr.hpp>
-#include <set>
+#include <boost/optional.hpp>
+#include <map>
 
 namespace qpid {
 
-namespace broker { class Timer; }
+namespace broker {
+class Timer;
+class Message;
+}
 
 namespace cluster {
 class Multicaster;
@@ -54,16 +58,23 @@
     // Cluster delivers expiry notice.
     void deliverExpire(uint64_t);
 
+    void setId(uint64_t id) { expiryId = id; }
+    uint64_t getId() const { return expiryId; }
+    
+    boost::optional<uint64_t> getId(broker::Message&);
+    
   private:
-    sys::Mutex lock;
-    typedef std::set<uint64_t> IdSet;
+    typedef std::map<broker::Message*,  uint64_t> MessageIdMap;
+    typedef std::map<uint64_t, broker::Message*> IdMessageMap;
 
     struct Expired : public broker::ExpiryPolicy {
         bool hasExpired(broker::Message&);
         void willExpire(broker::Message&);
     };
 
-    IdSet expired;
+    MessageIdMap unexpiredByMessage;
+    IdMessageMap unexpiredById;
+    uint64_t expiryId;
     boost::intrusive_ptr<Expired> expiredPolicy;
     Multicaster& mcast;
     MemberId memberId;

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=751760&r1=751759&r2=751760&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Mon Mar  9 17:03:40 2009
@@ -23,6 +23,7 @@
 #include "ClusterMap.h"
 #include "Connection.h"
 #include "Decoder.h"
+#include "ExpiryPolicy.h"
 #include "qpid/client/SessionBase_0_10Access.h" 
 #include "qpid/client/ConnectionAccess.h" 
 #include "qpid/broker/Broker.h"
@@ -87,14 +88,14 @@
 // TODO aconway 2008-09-24: optimization: update connections/sessions in parallel.
 
 UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, const
Url& url,
-                           broker::Broker& broker, const ClusterMap& m, uint64_t
frameId_, 
+                           broker::Broker& broker, const ClusterMap& m, ExpiryPolicy&
expiry_, 
                            const Cluster::ConnectionVector& cons, Decoder& decoder_,
                            const boost::function<void()>& ok,
                            const boost::function<void(const std::exception&)>&
fail,
                            const client::ConnectionSettings& cs
 )
     : updaterId(updater), updateeId(updatee), updateeUrl(url), updaterBroker(broker), map(m),
-      frameId(frameId_), connections(cons), decoder(decoder_),
+      expiry(expiry_), connections(cons), decoder(decoder_),
       connection(catchUpConnection()), shadowConnection(catchUpConnection()),
       done(ok), failed(fail), connectionSettings(cs)
 {
@@ -129,9 +130,9 @@
 
     std::for_each(connections.begin(), connections.end(), boost::bind(&UpdateClient::updateConnection,
this, _1));
 
+    ClusterConnectionProxy(session).expiryId(expiry.getId());
     ClusterConnectionMembershipBody membership;
     map.toMethodBody(membership);
-    membership.setFrameId(frameId);
     AMQFrame frame(membership);
     client::ConnectionAccess::getImpl(connection)->handle(frame);
     connection.close();
@@ -150,8 +151,7 @@
 
 void UpdateClient::updateExchange(const boost::shared_ptr<Exchange>& ex) {
     QPID_LOG(debug, updaterId << " updating exchange " << ex->getName());
-    ClusterConnectionProxy proxy(session);
-    proxy.exchange(encode(*ex));
+    ClusterConnectionProxy(session).exchange(encode(*ex));
 }
 
 /** Bind a queue to the update exchange and update messges to it
@@ -162,10 +162,11 @@
     bool haveLastPos;
     framing::SequenceNumber lastPos;
     client::AsyncSession session;
-
+    ExpiryPolicy& expiry;
+    
   public:
 
-    MessageUpdater(const string& q, const client::AsyncSession s) : queue(q), haveLastPos(false),
session(s) {
+    MessageUpdater(const string& q, const client::AsyncSession s, ExpiryPolicy& expiry_)
: queue(q), haveLastPos(false), session(s), expiry(expiry_) {
         session.exchangeBind(queue, UpdateClient::UPDATE);
     }
 
@@ -181,11 +182,20 @@
 
 
     void updateQueuedMessage(const broker::QueuedMessage& message) {
+        // Send the queue position if necessary.
         if (!haveLastPos || message.position - lastPos != 1)  {
             ClusterConnectionProxy(session).queuePosition(queue, message.position.getValue()-1);
             haveLastPos = true;
         }
         lastPos = message.position;
+
+        // Send the expiry ID if necessary.
+        if (message.payload->getProperties<DeliveryProperties>()->getTtl()) {
+            boost::optional<uint64_t> expiryId = expiry.getId(*message.payload);
+            if (!expiryId) return; // Message already expired, don't replicate.
+            ClusterConnectionProxy(session).expiryId(*expiryId);
+        }
+
         SessionBase_0_10Access sb(session);
         framing::MessageTransferBody transfer(
             framing::ProtocolVersion(), UpdateClient::UPDATE, message::ACCEPT_MODE_NONE,
message::ACQUIRE_MODE_PRE_ACQUIRED);
@@ -214,7 +224,7 @@
     QPID_LOG(debug, updaterId << " updating queue " << q->getName());
     ClusterConnectionProxy proxy(session);
     proxy.queue(encode(*q));
-    MessageUpdater updater(q->getName(), session);
+    MessageUpdater updater(q->getName(), session, expiry);
     q->eachMessage(boost::bind(&MessageUpdater::updateQueuedMessage, &updater,
_1));
     q->eachBinding(boost::bind(&UpdateClient::updateBinding, this, q->getName(),
_1));
 }
@@ -323,7 +333,7 @@
         // If the message is acquired then it is no longer on the
         // updatees queue, put it on the update queue for updatee to pick up.
         //
-        MessageUpdater(UPDATE, shadowSession).updateQueuedMessage(dr.getMessage());
+        MessageUpdater(UPDATE, shadowSession, expiry).updateQueuedMessage(dr.getMessage());
     }
     ClusterConnectionProxy(shadowSession).deliveryRecord(
         dr.getQueue()->getName(),
@@ -342,8 +352,8 @@
 
 class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater {
   public:
-    TxOpUpdater(UpdateClient& dc, client::AsyncSession s)
-        : MessageUpdater(UpdateClient::UPDATE, s), parent(dc), session(s), proxy(s) {}
+    TxOpUpdater(UpdateClient& dc, client::AsyncSession s, ExpiryPolicy& expiry)
+        : MessageUpdater(UpdateClient::UPDATE, s, expiry), parent(dc), session(s), proxy(s)
{}
 
     void operator()(const broker::DtxAck& ) {
         throw InternalErrorException("DTX transactions not currently supported by cluster.");
@@ -386,7 +396,7 @@
     broker::TxBuffer::shared_ptr txBuffer = s.getTxBuffer();
     if (txBuffer) {
         proxy.txStart();
-        TxOpUpdater updater(*this, shadowSession);
+        TxOpUpdater updater(*this, shadowSession, expiry);
         txBuffer->accept(updater);
         proxy.txEnd();
     }

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h?rev=751760&r1=751759&r2=751760&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h Mon Mar  9 17:03:40 2009
@@ -56,6 +56,7 @@
 class Connection;
 class ClusterMap;
 class Decoder;
+class ExpiryPolicy;
 
 /**
  * A client that updates the contents of a local broker to a remote one using AMQP.
@@ -65,7 +66,7 @@
     static const std::string UPDATE; // Name for special update queue and exchange.
     
     UpdateClient(const MemberId& updater, const MemberId& updatee, const Url&,
-                 broker::Broker& donor, const ClusterMap& map, uint64_t frameId,
+                 broker::Broker& donor, const ClusterMap& map, ExpiryPolicy&
expiry,
                  const std::vector<boost::intrusive_ptr<Connection> >&, Decoder&,
                  const boost::function<void()>& done,
                  const boost::function<void(const std::exception&)>& fail,
@@ -94,7 +95,7 @@
     Url updateeUrl;
     broker::Broker& updaterBroker;
     ClusterMap map;
-    uint64_t frameId;
+    ExpiryPolicy& expiry;
     std::vector<boost::intrusive_ptr<Connection> > connections;
     Decoder& decoder;
     client::Connection connection, shadowConnection;

Modified: qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp?rev=751760&r1=751759&r2=751760&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp Mon Mar  9 17:03:40 2009
@@ -35,7 +35,6 @@
     subchannel=0;
     channel=0;
     encodedSizeCache = 0;
-    clusterId = 0;
 }
 
 AMQFrame::AMQFrame(const boost::intrusive_ptr<AMQBody>& b) : body(b) { init();
}

Modified: qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h?rev=751760&r1=751759&r2=751760&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h Mon Mar  9 17:03:40 2009
@@ -92,9 +92,6 @@
     /** Must point to at least DECODE_SIZE_MIN bytes of data */
     static uint16_t decodeSize(char* data);
 
-    uint64_t getClusterId() const { return clusterId; }
-    void setClusterId(uint64_t id) { clusterId = id; }
-    
   private:
     void init();
 
@@ -106,7 +103,6 @@
     bool bos : 1;
     bool eos : 1;
     mutable uint32_t encodedSizeCache;
-    uint64_t clusterId;         // Used to identify frames in a clustered broekr.
 };
 
 std::ostream& operator<<(std::ostream&, const AMQFrame&);

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=751760&r1=751759&r2=751760&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Mon Mar  9 17:03:40 2009
@@ -151,7 +151,10 @@
     c.subs.subscribe(lq, q, browseSettings);
     vector<string> result;
     for (int i = 0; i < n; ++i) {
-        result.push_back(lq.get(TIMEOUT).getData());
+        Message m;
+        if (!lq.get(m, TIMEOUT))
+            break;
+        result.push_back(m.getData());
     }
     c.subs.getSubscription(q).cancel();
     return result;
@@ -202,13 +205,23 @@
     ClusterFixture cluster(2);
     Client c0(cluster[0], "c0");
     Client c1(cluster[1], "c1");
+    c0.session.queueDeclare("p");
     c0.session.queueDeclare("q");
     c0.session.messageTransfer(arg::content=ttlMessage("a", "q", 200));
     c0.session.messageTransfer(arg::content=Message("b", "q"));
-    BOOST_CHECK_EQUAL(browse(c1, "q", 2), list_of<string>("a")("b"));
-    sys::usleep(300*1000); 
+    c0.session.messageTransfer(arg::content=ttlMessage("x", "p", 10000));
+    c0.session.messageTransfer(arg::content=Message("y", "p"));
+    cluster.add();
+    Client c2(cluster[1], "c2");
+
+    BOOST_CHECK_EQUAL(browse(c0, "p", 2), list_of<string>("x")("y"));
+    BOOST_CHECK_EQUAL(browse(c1, "p", 2), list_of<string>("x")("y"));
+    BOOST_CHECK_EQUAL(browse(c2, "p", 2), list_of<string>("x")("y"));
+
+    sys::usleep(200*1000); 
     BOOST_CHECK_EQUAL(browse(c0, "q", 1), list_of<string>("b"));
     BOOST_CHECK_EQUAL(browse(c1, "q", 1), list_of<string>("b"));
+    BOOST_CHECK_EQUAL(browse(c2, "q", 1), list_of<string>("b"));
 }
 
 QPID_AUTO_TEST_CASE(testSequenceOptions) {

Modified: qpid/trunk/qpid/cpp/src/tests/start_cluster
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/start_cluster?rev=751760&r1=751759&r2=751760&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/start_cluster (original)
+++ qpid/trunk/qpid/cpp/src/tests/start_cluster Mon Mar  9 17:03:40 2009
@@ -28,7 +28,6 @@
     echo $* | newgrp ais
 }
 
-test -f cluster.ports && { echo "cluster.ports file already exists" ; exit 1; }
 rm -f cluster*.log
 SIZE=${1:-1}; shift
 CLUSTER=`pwd`		# Cluster name=pwd, avoid clashes.

Modified: qpid/trunk/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=751760&r1=751759&r2=751760&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ qpid/trunk/qpid/cpp/xml/cluster.xml Mon Mar  9 17:03:40 2009
@@ -132,7 +132,6 @@
     <control name="membership" code="0x21" label="Cluster membership details.">
       <field name="joiners" type="map"/> <!-- member-id -> URL -->
       <field name="members" type="map"/> <!-- member-id -> state -->
-      <field name="frame-id" type="uint64"/>> <!-- Frame id counter value -->
     </control>
 
     <!-- Set the position of a replicated queue. -->
@@ -140,11 +139,12 @@
       <field name="queue" type="str8"/>
       <field name="position" type="sequence-no"/>
     </control>
-    
+
     <!-- Replicate encoded exchanges/queues. -->
     <control name="exchange" code="0x31"><field name="encoded" type="str32"/></control>
     <control name="queue" code="0x32"><field name="encoded" type="str32"/></control>
-    
 
+    <!-- Set expiry-id for subsequent messages. -->
+    <control name="expiry-id" code="0x33"><field name="expiry-id" type="uint64"/></control>
   </class>
 </amqp>



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message