qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject svn commit: r1485001 - in /qpid/trunk/qpid: cpp/src/qpid/broker/Queue.cpp cpp/src/qpid/broker/QueueSettings.cpp cpp/src/qpid/broker/QueueSettings.h tests/src/py/qpid_tests/broker_0_10/new_api.py
Date Tue, 21 May 2013 22:35:58 GMT
Author: gsim
Date: Tue May 21 22:35:57 2013
New Revision: 1485001

URL: http://svn.apache.org/r1485001
Log:
QPID-4591: patch from Ernie Allen to add queue sequence number to messages

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.h
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/new_api.py

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1485001&r1=1485000&r2=1485001&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Tue May 21 22:35:57 2013
@@ -755,6 +755,7 @@ void Queue::push(Message& message, bool 
     {
         Mutex::ScopedLock locker(messageLock);
         message.setSequence(++sequence);
+        if (settings.sequencing) message.addAnnotation(settings.sequenceKey, (uint32_t)sequence);
         messages->publish(message);
         listeners.populate(copy);
         observeEnqueue(message, locker);

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.cpp?rev=1485001&r1=1485000&r2=1485001&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.cpp Tue May 21 22:35:57 2013
@@ -63,6 +63,7 @@ const std::string LVQ_LEGACY("qpid.last_
 const std::string LVQ_LEGACY_KEY("qpid.LVQ_key");
 const std::string LVQ_LEGACY_NOBROWSE("qpid.last_value_queue_no_browse");
 
+const std::string SEQUENCING("qpid.queue_msg_sequence");
 
 bool handleFairshareSetting(const std::string& basename, const std::string& key,
const qpid::types::Variant& value, QueueSettings& settings)
 {
@@ -97,7 +98,8 @@ QueueSettings::QueueSettings(bool d, boo
     noLocal(false),
     isBrowseOnly(false),
     autoDeleteDelay(0),
-    alertRepeatInterval(60)
+    alertRepeatInterval(60),
+    sequencing(false)
 {}
 
 bool QueueSettings::handle(const std::string& key, const qpid::types::Variant& value)
@@ -203,6 +205,10 @@ bool QueueSettings::handle(const std::st
     } else if (key == PAGE_FACTOR) {
         pageFactor = value;
         return true;
+    } else if (key == SEQUENCING) {
+        sequenceKey = value.getString();
+        sequencing = !sequenceKey.empty();
+        return true;
     } else if (key == FILTER) {
         filter = value.asString();
         return true;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.h?rev=1485001&r1=1485000&r2=1485001&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.h Tue May 21 22:35:57 2013
@@ -83,6 +83,10 @@ struct QueueSettings
     uint64_t maxFileSize;
     uint64_t maxFileCount;
 
+    std::string sequenceKey;
+    // store bool to avoid testing string value
+    bool sequencing;
+
     std::string filter;
 
     //yuck, yuck

Modified: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/new_api.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/new_api.py?rev=1485001&r1=1485000&r2=1485001&view=diff
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/new_api.py (original)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/new_api.py Tue May 21 22:35:57 2013
@@ -123,3 +123,83 @@ class GeneralTests(Base):
         self.assertEqual(rx_alt.available(), 0, "No further messages should be received via
the alternate exchange")
 
         sess4.close()
+
+class SequenceNumberTests(Base):
+    """
+    Tests of ring queue sequence number
+    """
+
+    def fail(self, text=None):
+        if text:
+            print "Fail: %r" % text
+        assert None
+
+    def setup_connection(self):
+        return Connection.establish(self.broker, **self.connection_options())
+
+    def setup_session(self):
+        return self.conn.session()
+
+    def setup_sender(self, name="ring-sequence-queue", key="qpid.queue_msg_sequence"):
+        addr = "%s; {create:sender, delete:always, node: {x-declare: {arguments: {'qpid.queue_msg_sequence':'%s',
'qpid.policy_type':'ring', 'qpid.max_count':4}}}}"  % (name, key)
+        sender = self.ssn.sender(addr)
+        return sender
+
+    def test_create_sequence_queue(self):
+        """
+        Test a queue with sequencing can be created
+        """
+
+        #setup, declare a queue
+        try:
+            sender = self.setup_sender()
+        except:
+            self.fail("Unable to create ring queue with sequencing enabled")
+
+    def test_get_sequence_number(self):
+        """
+        Test retrieving sequence number for queues 
+        """
+
+        key = "k"
+        sender = self.setup_sender("ring-sequence-queue2", key=key)
+
+        # send and receive 1 message and test the sequence number
+        msg = Message()
+        sender.send(msg)
+
+        receiver = self.ssn.receiver("ring-sequence-queue2")
+        msg = receiver.fetch(1)
+        try:
+            seqNo = msg.properties[key]
+            if int(seqNo) != 1:
+                txt = "Unexpected sequence number. Should be 1. Received (%s)" % seqNo
+                self.fail(txt)
+        except:
+            txt = "Unable to get key (%s) from message properties" % key
+            self.fail(txt)
+        receiver.close()
+
+    def test_sequence_number_gap(self):
+        """
+        Test that sequence number for ring queues shows gaps when queue
+        messages are overwritten
+        """
+        key = "qpid.seq"
+        sender = self.setup_sender("ring-sequence-queue3", key=key)
+        receiver = self.ssn.receiver("ring-sequence-queue3")
+
+        msg = Message()
+        sender.send(msg)
+        msg = receiver.fetch(1)
+
+        # send 5 more messages to overflow the queue
+        for i in range(5):
+            sender.send(msg)
+
+        msg = receiver.fetch(1)
+        seqNo = msg.properties[key]
+        if int(seqNo) != 3:
+            txt = "Unexpected sequence number. Should be 3. Received (%s)" % seqNo
+            self.fail(txt)
+        receiver.close()



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message