qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r448624 - in /incubator/qpid/trunk/qpid: cpp/ cpp/bin/ cpp/broker/ cpp/broker/inc/ cpp/broker/src/ cpp/broker/test/ cpp/client/ cpp/client/test/ cpp/common/ cpp/common/framing/generated/ python/qpid/ python/tests/
Date Thu, 21 Sep 2006 18:26:32 GMT
Author: aconway
Date: Thu Sep 21 11:26:31 2006
New Revision: 448624

URL: http://svn.apache.org/viewvc?view=rev&rev=448624
Log:
Implemented topic pattern matching for the TopicExchange.
Corrected default bindings to use the exchange named "" rather than
"amqp.direct".
Added python and unit tests for all of the above.
Minor improvements to testlib.py, also some tests for testlib itself.


Added:
    incubator/qpid/trunk/qpid/cpp/broker/test/TopicExchangeTest.cpp   (with props)
Modified:
    incubator/qpid/trunk/qpid/cpp/Makefile
    incubator/qpid/trunk/qpid/cpp/README
    incubator/qpid/trunk/qpid/cpp/bin/   (props changed)
    incubator/qpid/trunk/qpid/cpp/broker/Makefile
    incubator/qpid/trunk/qpid/cpp/broker/inc/DirectExchange.h
    incubator/qpid/trunk/qpid/cpp/broker/inc/Exchange.h
    incubator/qpid/trunk/qpid/cpp/broker/inc/ExchangeRegistry.h
    incubator/qpid/trunk/qpid/cpp/broker/inc/FanOutExchange.h
    incubator/qpid/trunk/qpid/cpp/broker/inc/TopicExchange.h
    incubator/qpid/trunk/qpid/cpp/broker/src/DirectExchange.cpp
    incubator/qpid/trunk/qpid/cpp/broker/src/ExchangeRegistry.cpp
    incubator/qpid/trunk/qpid/cpp/broker/src/FanOutExchange.cpp
    incubator/qpid/trunk/qpid/cpp/broker/src/SessionHandlerFactoryImpl.cpp
    incubator/qpid/trunk/qpid/cpp/broker/src/SessionHandlerImpl.cpp
    incubator/qpid/trunk/qpid/cpp/broker/src/TopicExchange.cpp
    incubator/qpid/trunk/qpid/cpp/client/Makefile
    incubator/qpid/trunk/qpid/cpp/client/test/   (props changed)
    incubator/qpid/trunk/qpid/cpp/common/Makefile
    incubator/qpid/trunk/qpid/cpp/common/framing/generated/   (props changed)
    incubator/qpid/trunk/qpid/cpp/options.mk
    incubator/qpid/trunk/qpid/python/qpid/testlib.py
    incubator/qpid/trunk/qpid/python/tests/exchange.py
    incubator/qpid/trunk/qpid/python/tests/queue.py

Modified: incubator/qpid/trunk/qpid/cpp/Makefile
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/Makefile?view=diff&rev=448624&r1=448623&r2=448624
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/Makefile (original)
+++ incubator/qpid/trunk/qpid/cpp/Makefile Thu Sep 21 11:26:31 2006
@@ -20,20 +20,15 @@
 # build them in the correct sequence.
 #
 
-include options.mk
-
 UNITTESTS=$(wildcard common/*/test/*.so broker/test/*.so)
 
 .PHONY: all clean doxygen
 
 test:   all
-	@$(MAKE) -C common test
-	@$(MAKE) -C broker test
-	@$(MAKE) -C client test
 	@$(MAKE) runtests
 
 runtests: 
-	$(CPPUNIT_HOME)/bin/DllPlugInTester -t -b $(UNITTESTS)
+	DllPlugInTester -c -b $(UNITTESTS)
 	bin/qpidd >> qpidd.log &
 	cd ../python ; ./run-tests -v -I cpp_failing.txt	
 

Modified: incubator/qpid/trunk/qpid/cpp/README
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/README?view=diff&rev=448624&r1=448623&r2=448624
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/README (original)
+++ incubator/qpid/trunk/qpid/cpp/README Thu Sep 21 11:26:31 2006
@@ -24,6 +24,17 @@
 DllPlugInTester is provided as part of cppunit. You can use it to run
 any subset of the unit tests. See Makefile for examples.
 
+NOTE: If foobar.so is a test plugin in the current directory then
+surprisingly this will fail with "can't load plugin":
+ DllPluginTester foobar.so
+
+Instead you need to say:
+ DllPluginTester ./foobar.so 
+ 
+Reason: DllPluginTester uses dlopen() which searches for shlibs 
+in the standard places unless the filename contains a "/".  In
+that case it just tries to open the filename.
+
 === System tests ===
 
 The Python test suite ../python/run_tests is the main set of broker

Propchange: incubator/qpid/trunk/qpid/cpp/bin/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Thu Sep 21 11:26:31 2006
@@ -0,0 +1 @@
+qpidd

Modified: incubator/qpid/trunk/qpid/cpp/broker/Makefile
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/Makefile?view=diff&rev=448624&r1=448623&r2=448624
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/Makefile (original)
+++ incubator/qpid/trunk/qpid/cpp/broker/Makefile Thu Sep 21 11:26:31 2006
@@ -27,11 +27,9 @@
 EXE_OBJECTS= src/Broker.o
 
 
-.PHONY: all clean test
+.PHONY: all clean 
 
 all: $(BROKER)
-
-test:
 	@$(MAKE) -C test all
 
 clean:

Modified: incubator/qpid/trunk/qpid/cpp/broker/inc/DirectExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/inc/DirectExchange.h?view=diff&rev=448624&r1=448623&r2=448624
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/inc/DirectExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/broker/inc/DirectExchange.h Thu Sep 21 11:26:31 2006
@@ -29,22 +29,19 @@
 namespace qpid {
 namespace broker {
     class DirectExchange : public virtual Exchange{
-        const string name;
         std::map<string, std::vector<Queue::shared_ptr> > bindings;
         qpid::concurrent::MonitorImpl lock;
 
     public:
         static const std::string typeName;
         
-        DirectExchange(const string& name);
+        DirectExchange(const std::string& name);
         
-        inline virtual const string& getName(){ return name; }
-        
-        virtual void bind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable*
args);
+        virtual void bind(Queue::shared_ptr queue, const std::string& routingKey, qpid::framing::FieldTable*
args);
 
-        virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable*
args);
+        virtual void unbind(Queue::shared_ptr queue, const std::string& routingKey, qpid::framing::FieldTable*
args);
 
-        virtual void route(Message::shared_ptr& msg, const string& routingKey, qpid::framing::FieldTable*
args);
+        virtual void route(Message::shared_ptr& msg, const std::string& routingKey,
qpid::framing::FieldTable* args);
 
         virtual ~DirectExchange();
     };

Modified: incubator/qpid/trunk/qpid/cpp/broker/inc/Exchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/inc/Exchange.h?view=diff&rev=448624&r1=448623&r2=448624
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/inc/Exchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/broker/inc/Exchange.h Thu Sep 21 11:26:31 2006
@@ -25,12 +25,14 @@
 namespace qpid {
 namespace broker {
     class Exchange{
-    public:
-        virtual const string& getName() = 0;
+        const std::string name;
+      public:
+        explicit Exchange(const std::string& name) : name(name) {}
+        virtual ~Exchange(){}
+        std::string getName() { return name; }
         virtual void bind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable*
args) = 0;
         virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable*
args) = 0;
         virtual void route(Message::shared_ptr& msg, const string& routingKey, qpid::framing::FieldTable*
args) = 0;
-        virtual ~Exchange(){}
     };
 }
 }

Modified: incubator/qpid/trunk/qpid/cpp/broker/inc/ExchangeRegistry.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/inc/ExchangeRegistry.h?view=diff&rev=448624&r1=448623&r2=448624
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/inc/ExchangeRegistry.h (original)
+++ incubator/qpid/trunk/qpid/cpp/broker/inc/ExchangeRegistry.h Thu Sep 21 11:26:31 2006
@@ -25,13 +25,15 @@
 namespace qpid {
 namespace broker {
     class ExchangeRegistry{
-        std::map<string, Exchange*> exchanges;
+        typedef std::map<string, Exchange*> ExchangeMap;
+        ExchangeMap exchanges;
         qpid::concurrent::Monitor* lock;
     public:
         ExchangeRegistry();
         void declare(Exchange* exchange);
         void destroy(const string& name);
         Exchange* get(const string& name);
+        Exchange* getDefault();
         inline qpid::concurrent::Monitor* getLock(){ return lock; }
         ~ExchangeRegistry();
     };

Modified: incubator/qpid/trunk/qpid/cpp/broker/inc/FanOutExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/inc/FanOutExchange.h?view=diff&rev=448624&r1=448623&r2=448624
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/inc/FanOutExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/broker/inc/FanOutExchange.h Thu Sep 21 11:26:31 2006
@@ -30,22 +30,19 @@
 namespace broker {
 
 class FanOutExchange : public virtual Exchange {
-    const string name;
     std::vector<Queue::shared_ptr> bindings;
     qpid::concurrent::MonitorImpl lock;
 
   public:
     static const std::string typeName;
         
-    FanOutExchange(const string& name);
+    FanOutExchange(const std::string& name);
         
-    inline virtual const string& getName(){ return name; }
+    virtual void bind(Queue::shared_ptr queue, const std::string& routingKey, qpid::framing::FieldTable*
args);
 
-    virtual void bind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable*
args);
+    virtual void unbind(Queue::shared_ptr queue, const std::string& routingKey, qpid::framing::FieldTable*
args);
 
-    virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable*
args);
-
-    virtual void route(Message::shared_ptr& msg, const string& routingKey, qpid::framing::FieldTable*
args);
+    virtual void route(Message::shared_ptr& msg, const std::string& routingKey, qpid::framing::FieldTable*
args);
 
     virtual ~FanOutExchange();
 };

Modified: incubator/qpid/trunk/qpid/cpp/broker/inc/TopicExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/inc/TopicExchange.h?view=diff&rev=448624&r1=448623&r2=448624
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/inc/TopicExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/broker/inc/TopicExchange.h Thu Sep 21 11:26:31 2006
@@ -18,7 +18,7 @@
 #ifndef _TopicExchange_
 #define _TopicExchange_
 
-#include <map>
+#include <tr1/unordered_map>
 #include <vector>
 #include "Exchange.h"
 #include "FieldTable.h"
@@ -28,28 +28,67 @@
 
 namespace qpid {
 namespace broker {
-    class TopicExchange : public virtual Exchange{
-        const string name;
-        std::map<string, std::vector<Queue::shared_ptr> > bindings;//NOTE: pattern
matching not yet supported
-        qpid::concurrent::MonitorImpl lock;
 
-    public:
-        static const std::string typeName;
-        
-        TopicExchange(const string& name);
+/** A vector of string tokens */
+class Tokens : public std::vector<std::string> {
+  public:
+    Tokens() {};
+    // Default copy, assign, dtor are sufficient.
+
+    /** Tokenize s, provides automatic conversion of string to Tokens */
+    Tokens(const std::string& s) { operator=(s); }
+    /** Tokenize s */
+    Tokens & operator=(const std::string& s);
+
+    struct Hash { size_t operator()(const Tokens&) const; };
+    typedef std::equal_to<Tokens> Equal;
+};
+
+/**
+ * Tokens that have been normalized as a pattern and can be matched
+ * with topic Tokens.  Normalized meands all sequences of mixed * and
+ * # are reduced to a series of * followed by at most one #.
+ */
+class TopicPattern : public Tokens
+{
+  public:
+    TopicPattern() {}
+    // Default copy, assign, dtor are sufficient.
+    TopicPattern(const Tokens& tokens) { operator=(tokens); }
+    TopicPattern(const std::string& str) { operator=(str); }
+    TopicPattern& operator=(const Tokens&);
+    TopicPattern& operator=(const std::string& str) { operator=(Tokens(str)); }
+    
+    /** Match a topic */
+    bool match(const std::string& topic) { return match(Tokens(topic)); }
+    bool match(const Tokens& topic) const;
+
+  private:
+    void normalize();
+};
+
+class TopicExchange : public virtual Exchange{
+    typedef std::tr1::unordered_map<TopicPattern, Queue::vector, TopicPattern::Hash>
BindingMap;
+    BindingMap bindings;
+    qpid::concurrent::MonitorImpl lock;
+
+  public:
+    static const std::string typeName;
+
+    TopicExchange(const string& name);
         
-        inline virtual const string& getName(){ return name; }
+    virtual void bind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable*
args);
 
-        virtual void bind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable*
args);
+    virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable*
args);
+
+    virtual void route(Message::shared_ptr& msg, const string& routingKey, qpid::framing::FieldTable*
args);
+
+    virtual ~TopicExchange();
+};
 
-        virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable*
args);
 
-        virtual void route(Message::shared_ptr& msg, const string& routingKey, qpid::framing::FieldTable*
args);
 
-        virtual ~TopicExchange();
-    };
 }
 }
-
 
 #endif

Modified: incubator/qpid/trunk/qpid/cpp/broker/src/DirectExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/src/DirectExchange.cpp?view=diff&rev=448624&r1=448623&r2=448624
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/src/DirectExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/broker/src/DirectExchange.cpp Thu Sep 21 11:26:31 2006
@@ -22,7 +22,7 @@
 using namespace qpid::broker;
 using namespace qpid::framing;
 
-DirectExchange::DirectExchange(const string& _name) : name(_name) {
+DirectExchange::DirectExchange(const string& name) : Exchange(name) {
 
 }
 
@@ -59,7 +59,7 @@
         (*i)->deliver(msg);
     }
     if(!count){
-        std::cout << "WARNING: DirectExchange " << name << " could not
route message with key " << routingKey << std::endl;
+        std::cout << "WARNING: DirectExchange " << getName() << " could
not route message with key " << routingKey << std::endl;
     }
     lock.release();
 }

Modified: incubator/qpid/trunk/qpid/cpp/broker/src/ExchangeRegistry.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/src/ExchangeRegistry.cpp?view=diff&rev=448624&r1=448623&r2=448624
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/src/ExchangeRegistry.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/broker/src/ExchangeRegistry.cpp Thu Sep 21 11:26:31 2006
@@ -24,6 +24,10 @@
 ExchangeRegistry::ExchangeRegistry() : lock(new MonitorImpl()){}
 
 ExchangeRegistry::~ExchangeRegistry(){
+    for (ExchangeMap::iterator i = exchanges.begin(); i != exchanges.end(); ++i)
+    {
+        delete i->second;
+    }
     delete lock;
 }
 
@@ -40,4 +44,14 @@
 
 Exchange* ExchangeRegistry::get(const string& name){
     return exchanges[name];
+}
+
+namespace 
+{
+const std::string empty;
+}
+
+Exchange* ExchangeRegistry::getDefault()
+{
+    return get(empty);
 }

Modified: incubator/qpid/trunk/qpid/cpp/broker/src/FanOutExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/src/FanOutExchange.cpp?view=diff&rev=448624&r1=448623&r2=448624
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/src/FanOutExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/broker/src/FanOutExchange.cpp Thu Sep 21 11:26:31 2006
@@ -23,7 +23,7 @@
 using namespace qpid::framing;
 using namespace qpid::concurrent;
 
-FanOutExchange::FanOutExchange(const string& _name) : name(_name) {}
+FanOutExchange::FanOutExchange(const std::string& name) : Exchange(name) {}
 
 void FanOutExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable*
args){
     Locker locker(lock);

Modified: incubator/qpid/trunk/qpid/cpp/broker/src/SessionHandlerFactoryImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/src/SessionHandlerFactoryImpl.cpp?view=diff&rev=448624&r1=448623&r2=448624
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/src/SessionHandlerFactoryImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/broker/src/SessionHandlerFactoryImpl.cpp Thu Sep 21 11:26:31
2006
@@ -22,10 +22,19 @@
 using namespace qpid::broker;
 using namespace qpid::io;
 
+namespace
+{
+const std::string empty;
+const std::string amq_direct("amq.direct");
+const std::string amq_topic("amq.topic");
+const std::string amq_fanout("amq.fanout");
+}
+
 SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(u_int32_t _timeout) : timeout(_timeout),
cleaner(&queues, timeout/10){
-    exchanges.declare(new DirectExchange("amq.direct"));
-    exchanges.declare(new TopicExchange("amq.topic"));
-    exchanges.declare(new FanOutExchange("amq.fanout"));
+    exchanges.declare(new DirectExchange(empty)); // Default exchange.
+    exchanges.declare(new DirectExchange(amq_direct));
+    exchanges.declare(new TopicExchange(amq_topic));
+    exchanges.declare(new FanOutExchange(amq_fanout));
     cleaner.start();
 }
 
@@ -35,6 +44,4 @@
 
 SessionHandlerFactoryImpl::~SessionHandlerFactoryImpl(){
     cleaner.stop();
-    exchanges.destroy("amq.direct");
-    exchanges.destroy("amq.topic");    
 }

Modified: incubator/qpid/trunk/qpid/cpp/broker/src/SessionHandlerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/src/SessionHandlerImpl.cpp?view=diff&rev=448624&r1=448623&r2=448624
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/src/SessionHandlerImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/broker/src/SessionHandlerImpl.cpp Thu Sep 21 11:26:31 2006
@@ -256,7 +256,7 @@
 	if (queue_created.second) { // This is a new queue
 	    parent->channels[channel]->setDefaultQueue(queue);
 	    //add default binding:
-	    parent->exchanges->get("amq.direct")->bind(queue, name, 0);
+	    parent->exchanges->getDefault()->bind(queue, name, 0);
 	    if(exclusive){
 		parent->exclusiveQueues.push_back(queue);
 	    } else if(autoDelete){
@@ -280,7 +280,7 @@
     Queue::shared_ptr queue = parent->getQueue(queueName, channel);
     Exchange* exchange = parent->exchanges->get(exchangeName);
     if(exchange){
-        if(routingKey.size() == 0 && queueName.size() == 0) routingKey = queue->getName();
+        if(routingKey.empty() && queueName.empty()) routingKey = queue->getName();
         exchange->bind(queue, routingKey, &arguments);
         if(!nowait) parent->client.getQueue().bindOk(channel);    
     }else{
@@ -361,7 +361,7 @@
                                                    string& exchange, string& routingKey,

                                                    bool mandatory, bool immediate){
 
-    Message* msg = new Message(parent, exchange.length() ? exchange : "amq.direct", routingKey,
mandatory, immediate);
+    Message* msg = new Message(parent, exchange, routingKey, mandatory, immediate);
     parent->channels[channel]->handlePublish(msg);
 } 
         

Modified: incubator/qpid/trunk/qpid/cpp/broker/src/TopicExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/src/TopicExchange.cpp?view=diff&rev=448624&r1=448623&r2=448624
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/src/TopicExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/broker/src/TopicExchange.cpp Thu Sep 21 11:26:31 2006
@@ -17,46 +17,146 @@
  */
 #include "TopicExchange.h"
 #include "ExchangeBinding.h"
+#include <algorithm>
 
 using namespace qpid::broker;
 using namespace qpid::framing;
 
-TopicExchange::TopicExchange(const string& _name) : name(_name) {
 
+// TODO aconway 2006-09-20: More efficient matching algorithm.
+// Areas for improvement:
+// - excessive string copying: should be 0 copy, match from original buffer.
+// - match/lookup: use descision tree or other more efficient structure.
+
+Tokens& Tokens::operator=(const std::string& s) {
+    clear();
+    if (s.empty()) return *this;
+    std::string::const_iterator i = s.begin();
+    while (true) {
+        // Invariant: i is at the beginning of the next untokenized word.
+        std::string::const_iterator j = find(i, s.end(), '.');
+        push_back(std::string(i, j));
+        if (j == s.end()) return *this;
+        i = j + 1;
+    }
+    return *this;
+}
+    
+size_t Tokens::Hash::operator()(const Tokens& p) const {
+    size_t hash = 0;
+    for (Tokens::const_iterator i = p.begin(); i != p.end(); ++i) {
+        hash += std::tr1::hash<std::string>()(*i);
+    }
+}
+
+TopicPattern& TopicPattern::operator=(const Tokens& tokens) {
+    Tokens::operator=(tokens);
+    normalize();
+    return *this;
+}
+
+namespace {
+const std::string hashmark("#");
+const std::string star("*");
 }
 
+void TopicPattern::normalize() {
+    std::string word;
+    Tokens::iterator i = begin();
+    while (i != end()) {
+        if (*i == hashmark) {
+            ++i;
+            while (i != end()) {
+                // Invariant: *(i-1)==#, [begin()..i-1] is normalized.
+                if (*i == star) { // Move * before #.
+                    std::swap(*i, *(i-1));
+                    ++i;
+                } else if (*i == hashmark) {
+                    erase(i); // Remove extra #
+                } else {
+                    break;
+                }
+            }
+        } else {
+            i ++;
+        }
+    }
+}
+
+
+namespace {
+// TODO aconway 2006-09-20: Ineficient to convert every routingKey to a string.
+// Need more efficient Tokens impl that can operate on a string in place.
+// 
+bool do_match(Tokens::const_iterator pattern_begin,  Tokens::const_iterator pattern_end,
Tokens::const_iterator target_begin,  Tokens::const_iterator target_end)
+{
+    // Invariant: [pattern_begin..p) matches [target_begin..t)
+    Tokens::const_iterator p = pattern_begin;
+    Tokens::const_iterator t = target_begin;
+    while (p != pattern_end && t != target_end)
+    {
+        if (*p == star || *p == *t) {
+            ++p, ++t;
+        } else if (*p == hashmark) {
+            ++p;
+            if (do_match(p, pattern_end, t, target_end)) return true;
+            while (t != target_end) {
+                ++t;
+                if (do_match(p, pattern_end, t, target_end)) return true;
+            }
+            return false;
+        } else {
+            return false;
+        }
+    }
+    while (p != pattern_end && *p == hashmark) ++p; // Ignore trailing #
+    return t == target_end && p == pattern_end;
+}
+}
+
+bool TopicPattern::match(const Tokens& target)  const
+{
+    return do_match(begin(), end(), target.begin(), target.end());
+}
+
+TopicExchange::TopicExchange(const string& name) : Exchange(name) { }
+
 void TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable*
args){
     lock.acquire();
-    bindings[routingKey].push_back(queue);
+    TopicPattern routingPattern(routingKey);
+    bindings[routingPattern].push_back(queue);
     queue->bound(new ExchangeBinding(this, queue, routingKey, args));
     lock.release();
 }
 
 void TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, FieldTable*
args){
     lock.acquire();
-    std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
-
-    std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(),
queue);
-    if(i < queues.end()){
-        queues.erase(i);
-        if(queues.empty()){
-            bindings.erase(routingKey);
-        }
-    }
+    BindingMap::iterator bi = bindings.find(TopicPattern(routingKey));
+    Queue::vector& qv(bi->second);
+    if (bi == bindings.end()) return;
+    Queue::vector::iterator q = find(qv.begin(), qv.end(), queue);
+    if(q == qv.end()) return;
+    qv.erase(q);
+    if(qv.empty()) bindings.erase(bi);
     lock.release();
 }
 
+
 void TopicExchange::route(Message::shared_ptr& msg, const string& routingKey, FieldTable*
args){
     lock.acquire();
-    std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
-    for(std::vector<Queue::shared_ptr>::iterator i = queues.begin(); i != queues.end();
i++){
-        (*i)->deliver(msg);
+    for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
+        if (i->first.match(routingKey)) {
+            Queue::vector& qv(i->second);
+            for(Queue::vector::iterator j = qv.begin(); j != qv.end(); j++){
+                (*j)->deliver(msg);
+            }
+        }
     }
     lock.release();
 }
 
-TopicExchange::~TopicExchange(){
-
-}
+TopicExchange::~TopicExchange() {}
 
 const std::string TopicExchange::typeName("topic");
+
+

Added: incubator/qpid/trunk/qpid/cpp/broker/test/TopicExchangeTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/test/TopicExchangeTest.cpp?view=auto&rev=448624
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/test/TopicExchangeTest.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/broker/test/TopicExchangeTest.cpp Thu Sep 21 11:26:31 2006
@@ -0,0 +1,186 @@
+#include "TopicExchange.h"
+#include <cppunit/TestCase.h>
+#include <cppunit/TextTestRunner.h>
+#include <cppunit/extensions/HelperMacros.h>
+#include <cppunit/plugin/TestPlugIn.h>
+
+using namespace qpid::broker;
+
+Tokens makeTokens(char** begin, char** end)
+{
+    Tokens t;
+    t.insert(t.end(), begin, end);
+    return t;
+}
+
+// Calculate size of an array. 
+#define LEN(a) (sizeof(a)/sizeof(a[0]))
+
+// Convert array to token vector
+#define TOKENS(a) makeTokens(a, a + LEN(a))
+
+// Allow CPPUNIT_EQUALS to print a Tokens.
+// TODO aconway 2006-09-19: Make it a template and put it in a shared test lib.
+//
+CppUnit::OStringStream& operator <<(CppUnit::OStringStream& out, const Tokens&
v)
+{
+    out << "[ ";
+    for (Tokens::const_iterator i = v.begin();
+         i != v.end(); ++i)
+    {
+        out << '"' << *i << '"' << (i+1 == v.end() ? "]" : ", ");
+    }
+}
+
+
+class TokensTest : public CppUnit::TestCase
+{
+    CPPUNIT_TEST_SUITE(TokensTest);
+    CPPUNIT_TEST(testTokens);
+    CPPUNIT_TEST_SUITE_END();
+
+  public:
+    void testTokens() 
+    {
+        Tokens tokens("hello.world");
+        char* expect[] = {"hello", "world"};
+        CPPUNIT_ASSERT_EQUAL(TOKENS(expect), tokens);
+        
+        tokens = "a.b.c";
+        char* expect2[] = { "a", "b", "c" };
+        CPPUNIT_ASSERT_EQUAL(TOKENS(expect2), tokens);
+
+        tokens = "";
+        CPPUNIT_ASSERT(tokens.empty());
+
+        tokens = "x";
+        char* expect3[] = { "x" };
+        CPPUNIT_ASSERT_EQUAL(TOKENS(expect3), tokens);
+
+        tokens = (".x");
+        char* expect4[] = { "", "x" };
+        CPPUNIT_ASSERT_EQUAL(TOKENS(expect4), tokens);
+
+        tokens = ("x.");
+        char* expect5[] = { "x", "" };
+        CPPUNIT_ASSERT_EQUAL(TOKENS(expect5), tokens);
+
+        tokens = (".");
+        char* expect6[] = { "", "" };
+        CPPUNIT_ASSERT_EQUAL(TOKENS(expect6), tokens);        
+
+        tokens = ("..");
+        char* expect7[] = { "", "", "" };
+        CPPUNIT_ASSERT_EQUAL(TOKENS(expect7), tokens);        
+    }
+    
+};
+
+#define ASSERT_NORMALIZED(expect, pattern) \
+    CPPUNIT_ASSERT_EQUAL(Tokens(expect), static_cast<Tokens>(TopicPattern(pattern)))
+class TopicPatternTest : public CppUnit::TestCase 
+{
+    CPPUNIT_TEST_SUITE(TopicPatternTest);
+    CPPUNIT_TEST(testNormalize);
+    CPPUNIT_TEST(testPlain);
+    CPPUNIT_TEST(testStar);
+    CPPUNIT_TEST(testHash);
+    CPPUNIT_TEST(testMixed);
+    CPPUNIT_TEST(testCombo);
+    CPPUNIT_TEST_SUITE_END();
+
+  public:
+
+    void testNormalize() 
+    {
+        CPPUNIT_ASSERT(TopicPattern("").empty());
+        ASSERT_NORMALIZED("a.b.c", "a.b.c");
+        ASSERT_NORMALIZED("a.*.c", "a.*.c");
+        ASSERT_NORMALIZED("#", "#");
+        ASSERT_NORMALIZED("#", "#.#.#.#");
+        ASSERT_NORMALIZED("*.*.*.#", "#.*.#.*.#.#.*");
+        ASSERT_NORMALIZED("a.*.*.*.#", "a.*.#.*.#.*.#");
+        ASSERT_NORMALIZED("a.*.*.*.#", "a.*.#.*.#.*");
+    }
+    
+    void testPlain() {
+        TopicPattern p("ab.cd.e");
+        CPPUNIT_ASSERT(p.match("ab.cd.e"));
+        CPPUNIT_ASSERT(!p.match("abx.cd.e"));
+        CPPUNIT_ASSERT(!p.match("ab.cd"));
+        CPPUNIT_ASSERT(!p.match("ab.cd..e."));
+        CPPUNIT_ASSERT(!p.match("ab.cd.e."));
+        CPPUNIT_ASSERT(!p.match(".ab.cd.e"));
+
+        p = "";
+        CPPUNIT_ASSERT(p.match(""));
+
+        p = ".";
+        CPPUNIT_ASSERT(p.match("."));
+    }
+
+
+    void testStar() 
+    {
+        TopicPattern p("a.*.b");
+        CPPUNIT_ASSERT(p.match("a.xx.b"));
+        CPPUNIT_ASSERT(!p.match("a.b"));
+
+        p = "*.x";
+        CPPUNIT_ASSERT(p.match("y.x"));
+        CPPUNIT_ASSERT(p.match(".x"));
+        CPPUNIT_ASSERT(!p.match("x"));
+
+        p = "x.x.*";
+        CPPUNIT_ASSERT(p.match("x.x.y"));
+        CPPUNIT_ASSERT(p.match("x.x."));
+        CPPUNIT_ASSERT(!p.match("x.x"));
+        CPPUNIT_ASSERT(!p.match("q.x.y"));
+    }
+
+    void testHash() 
+    {
+        TopicPattern p("a.#.b");
+        CPPUNIT_ASSERT(p.match("a.b"));
+        CPPUNIT_ASSERT(p.match("a.x.b"));
+        CPPUNIT_ASSERT(p.match("a..x.y.zz.b"));
+        CPPUNIT_ASSERT(!p.match("a.b."));
+        CPPUNIT_ASSERT(!p.match("q.x.b"));
+
+        p = "a.#";
+        CPPUNIT_ASSERT(p.match("a"));
+        CPPUNIT_ASSERT(p.match("a.b"));
+        CPPUNIT_ASSERT(p.match("a.b.c"));
+
+        p = "#.a";
+        CPPUNIT_ASSERT(p.match("a"));
+        CPPUNIT_ASSERT(p.match("x.y.a"));
+    }
+
+    void testMixed() 
+    {
+        TopicPattern p("*.x.#.y");
+        CPPUNIT_ASSERT(p.match("a.x.y"));
+        CPPUNIT_ASSERT(p.match("a.x.p.qq.y"));
+        CPPUNIT_ASSERT(!p.match("a.a.x.y"));
+        CPPUNIT_ASSERT(!p.match("aa.x.b.c"));
+
+        p = "a.#.b.*";
+        CPPUNIT_ASSERT(p.match("a.b.x"));
+        CPPUNIT_ASSERT(p.match("a.x.x.x.b.x"));
+    }
+
+    void testCombo() {
+        TopicPattern p("*.#.#.*.*.#");
+        CPPUNIT_ASSERT(p.match("x.y.z"));
+        CPPUNIT_ASSERT(p.match("x.y.z.a.b.c"));
+        CPPUNIT_ASSERT(!p.match("x.y"));
+        CPPUNIT_ASSERT(!p.match("x"));
+    }
+};
+
+    
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(TopicPatternTest);
+CPPUNIT_TEST_SUITE_REGISTRATION(TokensTest);

Propchange: incubator/qpid/trunk/qpid/cpp/broker/test/TopicExchangeTest.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/qpid/trunk/qpid/cpp/client/Makefile
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/client/Makefile?view=diff&rev=448624&r1=448623&r2=448624
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/client/Makefile (original)
+++ incubator/qpid/trunk/qpid/cpp/client/Makefile Thu Sep 21 11:26:31 2006
@@ -25,11 +25,9 @@
 OBJECTS := $(subst .cpp,.o,$(SOURCES))
 CLIENT_LIB=$(LIB_DIR)/libqpid_client.so.1.0
 
-.PHONY: all test clean 
+.PHONY: all clean 
 
 all: $(CLIENT_LIB)
-
-test:
 	@$(MAKE) -C test all
 
 clean:

Propchange: incubator/qpid/trunk/qpid/cpp/client/test/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Thu Sep 21 11:26:31 2006
@@ -0,0 +1,3 @@
+client_test
+topic_listener
+topic_publisher

Modified: incubator/qpid/trunk/qpid/cpp/common/Makefile
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/common/Makefile?view=diff&rev=448624&r1=448623&r2=448624
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/common/Makefile (original)
+++ incubator/qpid/trunk/qpid/cpp/common/Makefile Thu Sep 21 11:26:31 2006
@@ -31,14 +31,12 @@
 
 GENERATED_OBJECTS = framing/generated/amqp_methods.o
 
-.PHONY: all test clean
+.PHONY: all clean
 
 # We have to do two separate makes to ensure we pick up all generated files.
 all:
 	@$(MAKE) -C framing all
-	@make $(TARGET)
-
-test:
+	@$(MAKE) $(TARGET)
 	@$(MAKE) -C framing test
 
 clean:

Propchange: incubator/qpid/trunk/qpid/cpp/common/framing/generated/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Thu Sep 21 11:26:31 2006
@@ -0,0 +1,2 @@
+*.cpp
+*.h

Modified: incubator/qpid/trunk/qpid/cpp/options.mk
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/options.mk?view=diff&rev=448624&r1=448623&r2=448624
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/options.mk (original)
+++ incubator/qpid/trunk/qpid/cpp/options.mk Thu Sep 21 11:26:31 2006
@@ -1,4 +1,4 @@
- #l
+ #
  # Copyright (c) 2006 The Apache Software Foundation
  #
  # Licensed under the Apache License, Version 2.0 (the "License");
@@ -21,29 +21,25 @@
 TOOLS_DIR = ${QPID_CPP_HOME}/tools
 LIB_DIR = ${QPID_CPP_HOME}/lib
 BIN_DIR = ${QPID_CPP_HOME}/bin
-APR_HOME= /usr
-BOOST_HOME= /usr
-CPPUNIT_HOME= /usr
+APR_HOME = /usr/local/apr
 
 # Compile flags
-DEBUG = -g
+DEBUG = -ggdb3
 # _USE_APR_IO_ set when APR IO build is desired.
 OPT   = -D _USE_APR_IO_ #-O3 
 APR_INCLUDES=-I ${APR_HOME}/include/apr-1/ 
-BOOST_INCLUDES=-I ${BOOST_HOME}/include/boost-1_33_1
-CPPUNIT_INCLUDES=-I ${CPPUNIT_HOME}/include
-COMMON_INCLUDES = -I ${COMMON_HOME}/framing/inc -I ${COMMON_HOME}/framing/generated -I ${COMMON_HOME}/concurrent/inc
-I ${COMMON_HOME}/io/inc -I ${COMMON_HOME}/error/inc -I $(COMMON_HOME)/utils/inc ${APR_INCLUDES}
${BOOST_INCLUDES} ${CPPUNIT_INCLUDES}
+COMMON_INCLUDES = -I ${COMMON_HOME}/framing/inc -I ${COMMON_HOME}/framing/generated -I ${COMMON_HOME}/concurrent/inc
-I ${COMMON_HOME}/io/inc -I ${COMMON_HOME}/error/inc -I $(COMMON_HOME)/utils/inc ${APR_INCLUDES}
 SRC_INCLUDES = $(COMMON_INCLUDES) -I inc
 TEST_INCLUDES = $(COMMON_INCLUDES) -I ../inc
 INCLUDES=$(SRC_INCLUDES)	# Default to src
 CXXFLAGS = $(DEBUG) $(OPT) -MMD -fpic $(INCLUDES) 
 
+# General link flags
+LDFLAGS= -L $(LIB_DIR) -L ${APR_HOME}/lib $(RPATH)
+
 # TODO aconway 2006-09-12: This is not something we want in a release
 # but it's useful for development.
 RPATH= -Wl,-rpath,$(CURDIR)/$(LIB_DIR)
-
-# General link flags
-LDFLAGS= -L $(LIB_DIR) -L ${APR_HOME}/lib -L ${BOOST_HOME}/lib -L ${CPPUNIT_HOME}/lib $(RPATH)
 
 # Libraries and executables. Use absolute paths so exes can find
 # libs wherever they are run. TODO: Proper library management.

Modified: incubator/qpid/trunk/qpid/python/qpid/testlib.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/testlib.py?view=diff&rev=448624&r1=448623&r2=448624
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/testlib.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/testlib.py Thu Sep 21 11:26:31 2006
@@ -14,11 +14,13 @@
 # limitations under the License.
 #
 
+#
 # Support library for qpid python tests.
 #
 
 import sys, re, unittest, os, random, logging
 import qpid.client, qpid.spec
+import Queue
 from getopt import getopt, GetoptError
 
 
@@ -188,26 +190,41 @@
         self.exchanges.append((channel,exchange))
         return reply
 
-    def assertPublishConsume(self, queue="", exchange="", routing_key=""):
-        """
-        Publish a message and consume it, assert it comes back intact.
+    def uniqueString(self):
+        """Generate a unique string, unique for this TestBase instance"""
+        # TODO aconway 2006-09-20: Not thread safe.
+        if not "uniqueCounter" in dir(self): self.uniqueCounter = 1;
+        return "Test Message " + str(self.uniqueCounter)
+        
+    def consume(self, queueName):
+        """Consume from named queue returns the Queue object."""
+        reply = self.channel.basic_consume(queue=queueName, no_ack=True)
+        return self.client.queue(reply.consumer_tag)
+
+    def assertEmpty(self, queue):
+        """Assert that the queue is empty"""
+        try:
+            queue.get(timeout=1)
+            self.fail("Queue is not empty.")
+        except Queue.Empty: None              # Ignore
 
-        queue can be a single queue name or a list of queue names.
-        For a list assert the message appears on all queues.
-        Crude attempt to make unique messages so we can't consume
-        a message not really meant for us.
+    def assertPublishGet(self, queue, exchange="", routing_key=""):
         """
-        body = "TestMessage("+str(random.randint(999999, 1000000))+")"
+        Publish to exchange and assert queue.get() returns the same message.
+        """
+        body = self.uniqueString()
         self.channel.basic_publish(exchange=exchange,
                                    content=qpid.content.Content(body),
                                    routing_key=routing_key)
-        if not isinstance(queue, list): queue = [queue]
-        for q in queue:
-            reply = self.channel.basic_consume(queue=q, no_ack=True)
-            msg = self.client.queue(reply.consumer_tag).get(timeout=2)
-            self.assertEqual(body, msg.content.body)
-
+        self.assertEqual(body, queue.get(timeout=2).content.body)
         
+    def assertPublishConsume(self, queue="", exchange="", routing_key=""):
+        """
+        Publish a message and consume it, assert it comes back intact.
+        Return the Queue object used to consume.
+        """
+        self.assertPublishGet(self.consume(queue), exchange, routing_key)
+
     def assertChannelException(self, expectedCode, message): 
         self.assertEqual(message.method.klass.name, "channel")
         self.assertEqual(message.method.name, "close")

Modified: incubator/qpid/trunk/qpid/python/tests/exchange.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests/exchange.py?view=diff&rev=448624&r1=448623&r2=448624
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests/exchange.py (original)
+++ incubator/qpid/trunk/qpid/python/tests/exchange.py Thu Sep 21 11:26:31 2006
@@ -57,9 +57,25 @@
         self.channel.queue_bind(queue="q", exchange=ex)
         self.queue_declare(queue="p") 
         self.channel.queue_bind(queue="p", exchange=ex)
-        self.assertPublishConsume(exchange=ex, queue=["q","p"])
+        for qname in ["q", "p"]: self.assertPublishGet(self.consume(qname), ex)
+
+    def verifyTopicExchange(self, ex):
+        """Verify that ex behaves like a topic exchange"""
+        self.queue_declare(queue="a")
+        self.channel.queue_bind(queue="a", exchange=ex, routing_key="a.#.b.*")
+        q = self.consume("a")
+        self.assertPublishGet(q, ex, "a.b.x")
+        self.assertPublishGet(q, ex, "a.x.b.x")
+        self.assertPublishGet(q, ex, "a.x.x.b.x")
+
+        # Shouldn't match
+        self.channel.basic_publish(exchange=ex, routing_key="a.b")        
+        self.channel.basic_publish(exchange=ex, routing_key="a.b.x.y")        
+        self.channel.basic_publish(exchange=ex, routing_key="x.a.b.x")        
+        self.channel.basic_publish(exchange=ex, routing_key="a.b")
+        self.assert_(q.empty())
+
 
-    
 class RecommendedTypesRuleTests(TestBase, StandardExchangeVerifier):
     """
     The server SHOULD implement these standard exchange types: topic, headers.
@@ -76,6 +92,11 @@
         """Declare and test a fanout exchange"""
         self.exchange_declare(0, exchange="f", type="fanout")
         self.verifyFanOutExchange("f")
+
+    def testTopic(self):
+        """Declare and test a topic exchange"""
+        self.exchange_declare(0, exchange="t", type="topic")
+        self.verifyTopicExchange("t")
         
 
 class RequiredInstancesRuleTests(TestBase, StandardExchangeVerifier):
@@ -88,24 +109,17 @@
     exchange instance (amq.fanout, amq.direct, and amq.topic, amq.headers if
     those types are defined).
     """
-    # TODO aconway 2006-09-01: Add tests for 3.1.3.1:
-    # - Test auto binding by q name
-    # - Test the nameless "default publish" exchange.
-    # - Auto created amq.fanout exchange
-
     def testAmqDirect(self): self.verifyDirectExchange("amq.direct")
 
     def testAmqFanOut(self): self.verifyFanOutExchange("amq.fanout")
 
-    def testAmqTopic(self): 
-        self.exchange_declare(0, exchange="amq.topic", passive="true")
-        # TODO aconway 2006-09-14: verify topic behavior
+    def testAmqTopic(self):  self.verifyTopicExchange("amq.topic")
         
     def testAmqHeaders(self): 
         self.exchange_declare(0, exchange="amq.headers", passive="true")
         # TODO aconway 2006-09-14: verify headers behavior
 
-class DefaultExchangeRuleTests(TestBase):
+class DefaultExchangeRuleTests(TestBase, StandardExchangeVerifier):
     """
     The server MUST predeclare a direct exchange to act as the default exchange
     for content Publish methods and for default queue bindings.
@@ -115,6 +129,12 @@
     routing key but without specifying the exchange name, then ensuring that
     the message arrives in the queue correctly.
     """
+    def testDefaultExchange(self):
+        # Test automatic binding by queue name.
+        self.queue_declare(queue="d")
+        self.assertPublishConsume(queue="d", routing_key="d")
+        # Test explicit bind to default queue
+        self.verifyDirectExchange("")
 
 
 class DefaultAccessRuleTests(TestBase):
@@ -123,7 +143,7 @@
     by specifying an empty exchange name in the Queue.Bind and content Publish
     methods.
     """
-
+    # TODO aconway 2006-09-18: fill this in.
 
 class ExtensionsRuleTests(TestBase):
     """

Modified: incubator/qpid/trunk/qpid/python/tests/queue.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests/queue.py?view=diff&rev=448624&r1=448623&r2=448624
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests/queue.py (original)
+++ incubator/qpid/trunk/qpid/python/tests/queue.py Thu Sep 21 11:26:31 2006
@@ -156,9 +156,9 @@
 
         #straight-forward case:
         channel.queue_declare(queue="delete-me")
-        channel.basic_publish(exchange="amq.direct", routing_key="delete-me", content=Content("a"))
-        channel.basic_publish(exchange="amq.direct", routing_key="delete-me", content=Content("b"))
-        channel.basic_publish(exchange="amq.direct", routing_key="delete-me", content=Content("c"))
       
+        channel.basic_publish(routing_key="delete-me", content=Content("a"))
+        channel.basic_publish(routing_key="delete-me", content=Content("b"))
+        channel.basic_publish(routing_key="delete-me", content=Content("c"))        
         reply = channel.queue_delete(queue="delete-me")
         self.assertEqual(3, reply.message_count)
         #check that it has gone be declaring passively
@@ -189,7 +189,7 @@
         #create a queue and add a message to it (use default binding):
         channel.queue_declare(queue="delete-me-2")
         channel.queue_declare(queue="delete-me-2", passive="True")
-        channel.basic_publish(exchange="amq.direct", routing_key="delete-me-2", content=Content("message"))
+        channel.basic_publish(routing_key="delete-me-2", content=Content("message"))
 
         #try to delete, but only if empty:
         try:



Mime
View raw message