qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cctriel...@apache.org
Subject svn commit: r710157 - in /incubator/qpid/trunk/qpid/cpp/src: qpid/broker/Exchange.cpp qpid/broker/Exchange.h tests/ExchangeTest.cpp
Date Mon, 03 Nov 2008 21:06:23 GMT
Author: cctrieloff
Date: Mon Nov  3 13:06:22 2008
New Revision: 710157

URL: http://svn.apache.org/viewvc?rev=710157&view=rev
Log:
correction for Active-Active clustering, allowing late joining nodes in the cluster to sync
counter values for sequenced messages

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
    incubator/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=710157&r1=710156&r2=710157&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp Mon Nov  3 13:06:22 2008
@@ -152,7 +152,9 @@
     buffer.getShortString(type);
     buffer.get(args);
 
-    return exchanges.declare(name, type, durable, args).first;
+    Exchange::shared_ptr exch = exchanges.declare(name, type, durable, args).first;
+    exch->sequenceNo = buffer.getInt64();
+    return exch;
 }
 
 void Exchange::encode(Buffer& buffer) const 
@@ -161,6 +163,7 @@
     buffer.putOctet(durable);
     buffer.putShortString(getType());
     buffer.put(args);
+    buffer.putInt64(sequenceNo);
 }
 
 uint32_t Exchange::encodedSize() const 
@@ -168,7 +171,8 @@
     return name.size() + 1/*short string size*/
         + 1 /*durable*/
         + getType().size() + 1/*short string size*/
-        + args.encodedSize(); 
+        + args.encodedSize()
+        + 8; /*int64 */ 
 }
 
 ManagementObject* Exchange::GetManagementObject (void) const

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h?rev=710157&r1=710156&r2=710157&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h Mon Nov  3 13:06:22 2008
@@ -50,7 +50,7 @@
 protected:
     bool sequence;
     mutable qpid::sys::Mutex sequenceLock;
-    uint64_t sequenceNo;
+    int64_t sequenceNo;
     bool ive;
     boost::intrusive_ptr<Message> lastMsg;
 

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp?rev=710157&r1=710156&r2=710157&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp Mon Nov  3 13:06:22 2008
@@ -179,48 +179,65 @@
 {
     FieldTable args;
     args.setInt("qpid.msg_sequence",1);
-    
-    DirectExchange direct("direct1", false, args);
-    
-    intrusive_ptr<Message> msg1 = cmessage("e", "A");
-    intrusive_ptr<Message> msg2 = cmessage("e", "B");
-    intrusive_ptr<Message> msg3 = cmessage("e", "C");
-
-    DeliverableMessage dmsg1(msg1);
-    DeliverableMessage dmsg2(msg2);
-    DeliverableMessage dmsg3(msg3);
-
-    direct.route(dmsg1, "abc", 0);
-    direct.route(dmsg2, "abc", 0);
-    direct.route(dmsg3, "abc", 0);
+    char* buff = new char[10000];
+    framing::Buffer buffer(buff,10000);
+    {
+        DirectExchange direct("direct1", false, args);
+
+        intrusive_ptr<Message> msg1 = cmessage("e", "A");
+        intrusive_ptr<Message> msg2 = cmessage("e", "B");
+        intrusive_ptr<Message> msg3 = cmessage("e", "C");
+
+        DeliverableMessage dmsg1(msg1);
+        DeliverableMessage dmsg2(msg2);
+        DeliverableMessage dmsg3(msg3);
+
+        direct.route(dmsg1, "abc", 0);
+        direct.route(dmsg2, "abc", 0);
+        direct.route(dmsg3, "abc", 0);
+
+        BOOST_CHECK_EQUAL(1, msg1->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
+        BOOST_CHECK_EQUAL(2, msg2->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
+        BOOST_CHECK_EQUAL(3, msg3->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
+
+        FanOutExchange fanout("fanout1", false, args);
+        HeadersExchange header("headers1", false, args);
+        TopicExchange topic ("topic1", false, args);
+
+        // check other exchanges, that they preroute
+        intrusive_ptr<Message> msg4 = cmessage("e", "A");
+        intrusive_ptr<Message> msg5 = cmessage("e", "B");
+        intrusive_ptr<Message> msg6 = cmessage("e", "C");
+
+        DeliverableMessage dmsg4(msg4);
+        DeliverableMessage dmsg5(msg5);
+        DeliverableMessage dmsg6(msg6);
+
+        fanout.route(dmsg4, "abc", 0);
+        BOOST_CHECK_EQUAL(1, msg4->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
+
+        FieldTable headers;
+        header.route(dmsg5, "abc", &headers);
+        BOOST_CHECK_EQUAL(1, msg5->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
+
+        topic.route(dmsg6, "abc", 0);
+        BOOST_CHECK_EQUAL(1, msg6->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
+        direct.encode(buffer);
+    }
+    {
+        
+        ExchangeRegistry exchanges;
+        buffer.reset();
+        DirectExchange::shared_ptr exch_dec = Exchange::decode(exchanges, buffer);
+        
+        intrusive_ptr<Message> msg1 = cmessage("e", "A");
+        DeliverableMessage dmsg1(msg1);
+        exch_dec->route(dmsg1, "abc", 0);
 
-    BOOST_CHECK_EQUAL(1, msg1->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
-    BOOST_CHECK_EQUAL(2, msg2->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
-    BOOST_CHECK_EQUAL(3, msg3->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
+        BOOST_CHECK_EQUAL(4, msg1->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
     
-    FanOutExchange fanout("fanout1", false, args);
-    HeadersExchange header("headers1", false, args);
-    TopicExchange topic ("topic1", false, args);
-    
-    // check other exchanges, that they preroute
-    intrusive_ptr<Message> msg4 = cmessage("e", "A");
-    intrusive_ptr<Message> msg5 = cmessage("e", "B");
-    intrusive_ptr<Message> msg6 = cmessage("e", "C");
-
-    DeliverableMessage dmsg4(msg4);
-    DeliverableMessage dmsg5(msg5);
-    DeliverableMessage dmsg6(msg6);
-    
-    fanout.route(dmsg4, "abc", 0);
-    BOOST_CHECK_EQUAL(1, msg4->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
-    
-    FieldTable headers;
-    header.route(dmsg5, "abc", &headers);
-    BOOST_CHECK_EQUAL(1, msg5->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
-
-    topic.route(dmsg6, "abc", 0);
-    BOOST_CHECK_EQUAL(1, msg6->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
-
+    }
+    delete [] buff;
 }
 
 QPID_AUTO_TEST_CASE(testIVEOption) 



Mime
View raw message