qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject svn commit: r448881 - in /incubator/qpid/trunk/qpid: cpp/broker/inc/SessionHandlerImpl.h cpp/broker/src/Channel.cpp cpp/broker/src/SessionHandlerImpl.cpp python/tests/basic.py python/tests/broker.py
Date Fri, 22 Sep 2006 09:53:48 GMT
Author: gsim
Date: Fri Sep 22 02:53:47 2006
New Revision: 448881

URL: http://svn.apache.org/viewvc?view=rev&rev=448881
Log:
Added tests for basic_cancel and for handling of invalid channel ids.


Modified:
    incubator/qpid/trunk/qpid/cpp/broker/inc/SessionHandlerImpl.h
    incubator/qpid/trunk/qpid/cpp/broker/src/Channel.cpp
    incubator/qpid/trunk/qpid/cpp/broker/src/SessionHandlerImpl.cpp
    incubator/qpid/trunk/qpid/python/tests/basic.py
    incubator/qpid/trunk/qpid/python/tests/broker.py

Modified: incubator/qpid/trunk/qpid/cpp/broker/inc/SessionHandlerImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/inc/SessionHandlerImpl.h?view=diff&rev=448881&r1=448880&r2=448881
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/inc/SessionHandlerImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/broker/inc/SessionHandlerImpl.h Fri Sep 22 02:53:47 2006
@@ -87,6 +87,7 @@
     void handleContent(u_int16_t channel, qpid::framing::AMQContentBody::shared_ptr body);
     void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body);
 
+    Channel* getChannel(u_int16_t channel);
     /**
      * Get named queue, never returns 0.
      * @return: named queue or default queue for channel if name=""
@@ -96,7 +97,7 @@
     Queue::shared_ptr getQueue(const string& name, u_int16_t channel);
 
     Exchange* findExchange(const string& name);
-
+    
   public:
     SessionHandlerImpl(qpid::io::SessionContext* context, QueueRegistry* queues, 
                        ExchangeRegistry* exchanges, AutoDelete* cleaner, const u_int32_t
timeout);

Modified: incubator/qpid/trunk/qpid/cpp/broker/src/Channel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/src/Channel.cpp?view=diff&rev=448881&r1=448880&r2=448881
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/src/Channel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/broker/src/Channel.cpp Fri Sep 22 02:53:47 2006
@@ -57,11 +57,14 @@
 }
 
 void Channel::cancel(string& tag){
-    ConsumerImpl* c = consumers[tag];
-    if(c){
-        c->cancel();
-        consumers.erase(tag);
-        delete c;
+    consumer_iterator i = consumers.find(tag);
+    if(i != consumers.end()){
+        ConsumerImpl* c = i->second;
+        consumers.erase(i);
+        if(c){
+            c->cancel();
+            delete c;
+        }
     }
 }
 
@@ -69,9 +72,11 @@
     //cancel all consumers
     for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin()
){
         ConsumerImpl* c = i->second;
-        c->cancel();
         consumers.erase(i);
-        delete c;
+        if(c){
+            c->cancel();
+            delete c;
+        }
     }
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/broker/src/SessionHandlerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/src/SessionHandlerImpl.cpp?view=diff&rev=448881&r1=448880&r2=448881
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/src/SessionHandlerImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/broker/src/SessionHandlerImpl.cpp Fri Sep 22 02:53:47 2006
@@ -55,10 +55,18 @@
     delete queueHandler;
 }
 
+Channel* SessionHandlerImpl::getChannel(u_int16_t channel){
+    channel_iterator i = channels.find(channel);
+    if(i == channels.end()){
+        throw ConnectionException(504, "Unknown channel: " + channel);
+    }
+    return i->second;
+}
+
 Queue::shared_ptr SessionHandlerImpl::getQueue(const string& name, u_int16_t channel){
     Queue::shared_ptr queue;
     if (name.empty()) {
-        queue = channels[channel]->getDefaultQueue();
+        queue = getChannel(channel)->getDefaultQueue();
         if (!queue) throw ConnectionException( 530, "Queue must be specified or previously
declared" );
     } else {
         queue = queues->find(name);
@@ -143,11 +151,11 @@
 }
 
 void SessionHandlerImpl::handleHeader(u_int16_t channel, AMQHeaderBody::shared_ptr body){
-    channels[channel]->handleHeader(body, exchanges);
+    getChannel(channel)->handleHeader(body, exchanges);
 }
 
 void SessionHandlerImpl::handleContent(u_int16_t channel, AMQContentBody::shared_ptr body){
-    channels[channel]->handleContent(body, exchanges);
+    getChannel(channel)->handleContent(body, exchanges);
 }
 
 void SessionHandlerImpl::handleHeartbeat(AMQHeartbeatBody::shared_ptr body){
@@ -195,11 +203,13 @@
         
 void SessionHandlerImpl::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t replyCode,
string& replyText, 
                                                    u_int16_t classId, u_int16_t methodId){
-    Channel* c = parent->channels[channel];
-    parent->channels.erase(channel);
-    c->close();
-    delete c;
-    parent->client.getChannel().closeOk(channel);
+    Channel* c = parent->getChannel(channel);
+    if(c){
+        parent->channels.erase(channel);
+        c->close();
+        delete c;
+        parent->client.getChannel().closeOk(channel);
+    }
 } 
         
 void SessionHandlerImpl::ChannelHandlerImpl::closeOk(u_int16_t channel){} 
@@ -254,7 +264,7 @@
 	queue = queue_created.first;
 	assert(queue);
 	if (queue_created.second) { // This is a new queue
-	    parent->channels[channel]->setDefaultQueue(queue);
+	    parent->getChannel(channel)->setDefaultQueue(queue);
 	    //add default binding:
 	    parent->exchanges->getDefault()->bind(queue, name, 0);
 	    if(exclusive){
@@ -322,8 +332,8 @@
 void SessionHandlerImpl::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t prefetchSize,
u_int16_t prefetchCount, bool global){
     //TODO: handle global
     //TODO: channel doesn't do anything with these qos parameters yet
-    parent->channels[channel]->setPrefetchSize(prefetchSize);
-    parent->channels[channel]->setPrefetchCount(prefetchCount);
+    parent->getChannel(channel)->setPrefetchSize(prefetchSize);
+    parent->getChannel(channel)->setPrefetchCount(prefetchCount);
     parent->client.getBasic().qosOk(channel);
 } 
         
@@ -353,7 +363,7 @@
 } 
         
 void SessionHandlerImpl::BasicHandlerImpl::cancel(u_int16_t channel, string& consumerTag,
bool nowait){
-    parent->channels[channel]->cancel(consumerTag);
+    parent->getChannel(channel)->cancel(consumerTag);
     if(!nowait) parent->client.getBasic().cancelOk(channel, consumerTag);
 } 
         
@@ -362,7 +372,7 @@
                                                    bool mandatory, bool immediate){
 
     Message* msg = new Message(parent, exchange, routingKey, mandatory, immediate);
-    parent->channels[channel]->handlePublish(msg);
+    parent->getChannel(channel)->handlePublish(msg);
 } 
         
 void SessionHandlerImpl::BasicHandlerImpl::get(u_int16_t channel, u_int16_t ticket, string&
queue, bool noAck){} 

Modified: incubator/qpid/trunk/qpid/python/tests/basic.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests/basic.py?view=diff&rev=448881&r1=448880&r2=448881
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests/basic.py (original)
+++ incubator/qpid/trunk/qpid/python/tests/basic.py Fri Sep 22 02:53:47 2006
@@ -113,3 +113,27 @@
         except Closed, e:
             self.assertConnectionException(530, e.args[0])
 
+    def test_basic_cancel(self):
+        """
+        Test compliance of the basic.cancel method
+        """
+        channel = self.channel
+        #setup, declare a queue:
+        channel.queue_declare(queue="test-queue-4", exclusive=True)
+        channel.basic_consume(consumer_tag="my-consumer", queue="test-queue-4")
+        channel.basic_publish(routing_key="test-queue-4", content=Content("One"))
+
+        #cancel should stop messages being delivered
+        channel.basic_cancel(consumer_tag="my-consumer")
+        channel.basic_publish(routing_key="test-queue-4", content=Content("Two"))
+        myqueue = self.client.queue("my-consumer")
+        msg = myqueue.get(timeout=1)
+        self.assertEqual("One", msg.content.body)
+        try:
+            msg = myqueue.get(timeout=1) 
+            self.fail("Got message after cancellation: " + msg)
+        except Empty: None
+
+        #cancellation of non-existant consumers should be handled without error
+        channel.basic_cancel(consumer_tag="my-consumer")
+        channel.basic_cancel(consumer_tag="this-never-existed")

Modified: incubator/qpid/trunk/qpid/python/tests/broker.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests/broker.py?view=diff&rev=448881&r1=448880&r2=448881
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests/broker.py (original)
+++ incubator/qpid/trunk/qpid/python/tests/broker.py Fri Sep 22 02:53:47 2006
@@ -13,6 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+from qpid.client import Closed
 from qpid.queue import Empty
 from qpid.content import Content
 from qpid.testlib import testrunner, TestBase
@@ -82,3 +83,20 @@
         msg = queue.get(timeout=5)
         self.assert_(msg.content.body == body)
 
+    def test_invalid_channel(self):
+        other = self.connect()
+        channel = other.channel(200)
+        try:
+            channel.queue_declare(exclusive=True)
+            self.fail("Expected error on queue_declare for invalid channel")
+        except Closed, e:
+            self.assertConnectionException(504, e.args[0])
+        
+        channel = self.client.channel(200)
+        channel.channel_open()
+        channel.channel_close()
+        try:
+            channel.queue_declare(exclusive=True)
+            self.fail("Expected error on queue_declare for closed channel")
+        except Closed, e:
+            self.assertConnectionException(504, e.args[0])



Mime
View raw message