qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r1081801 - in /qpid/branches/qpid-2920/qpid: cpp/src/ cpp/src/qpid/ cpp/src/qpid/broker/ cpp/src/qpid/cluster/ cpp/src/tests/ java/client/src/main/java/org/apache/qpid/client/ java/systests/src/main/java/org/apache/qpid/test/client/destinat...
Date Tue, 15 Mar 2011 14:34:03 GMT
Author: aconway
Date: Tue Mar 15 14:34:02 2011
New Revision: 1081801

URL: http://svn.apache.org/viewvc?rev=1081801&view=rev
Log:
Merge branch 'trunk' into qpid-2920, trunk at r1081631

Added:
    qpid/branches/qpid-2920/qpid/cpp/src/qpid/BufferRef.h   (with props)
Modified:
    qpid/branches/qpid-2920/qpid/cpp/src/Makefile.am
    qpid/branches/qpid-2920/qpid/cpp/src/qpid/Modules.cpp
    qpid/branches/qpid-2920/qpid/cpp/src/qpid/RefCountedBuffer.cpp
    qpid/branches/qpid-2920/qpid/cpp/src/qpid/RefCountedBuffer.h
    qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/TopicExchange.cpp
    qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/TopicExchange.h
    qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/Event.cpp
    qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/Event.h
    qpid/branches/qpid-2920/qpid/cpp/src/tests/qpid-cluster-benchmark
    qpid/branches/qpid-2920/qpid/cpp/src/tests/qpid-cpp-benchmark
    qpid/branches/qpid-2920/qpid/cpp/src/tests/qpid-receive.cpp
    qpid/branches/qpid-2920/qpid/cpp/src/tests/qpid-send.cpp
    qpid/branches/qpid-2920/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    qpid/branches/qpid-2920/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java

Modified: qpid/branches/qpid-2920/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/Makefile.am?rev=1081801&r1=1081800&r2=1081801&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/Makefile.am (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/Makefile.am Tue Mar 15 14:34:02 2011
@@ -176,7 +176,7 @@ nobase_include_HEADERS +=			\
   ../include/qpid/sys/posix/Time.h		\
   ../include/qpid/sys/posix/check.h
 
-if HAVE_EPOLL 
+if HAVE_EPOLL
   poller = qpid/sys/epoll/EpollPoller.cpp
 endif
 
@@ -195,7 +195,7 @@ libqpidcommon_la_SOURCES += $(poller) $(
 posix_broker_src = \
   qpid/broker/posix/BrokerDefaults.cpp
 
-lib_LTLIBRARIES = libqpidtypes.la libqpidcommon.la libqpidbroker.la libqpidclient.la libqpidmessaging.la 
+lib_LTLIBRARIES = libqpidtypes.la libqpidcommon.la libqpidbroker.la libqpidclient.la libqpidmessaging.la
 
 # Definitions for client and daemon plugins
 PLUGINLDFLAGS=-no-undefined -module -avoid-version
@@ -203,7 +203,7 @@ confdir=$(sysconfdir)/qpid
 dmoduledir=$(libdir)/qpid/daemon
 cmoduledir=$(libdir)/qpid/client
 dmodule_LTLIBRARIES =
-cmodule_LTLIBRARIES = 
+cmodule_LTLIBRARIES =
 
 include cluster.mk
 include acl.mk
@@ -341,6 +341,7 @@ libqpidcommon_la_SOURCES +=			\
   qpid/RefCounted.h				\
   qpid/RefCountedBuffer.cpp			\
   qpid/RefCountedBuffer.h			\
+  qpid/BufferRef.h				\
   qpid/Sasl.h                                   \
   qpid/SaslFactory.cpp                          \
   qpid/SaslFactory.h                            \
@@ -894,6 +895,6 @@ dist-hook: $(BUILT_SOURCES)
 install-data-local:
 	$(mkinstalldirs) $(DESTDIR)/$(localstatedir)/lib/qpidd
 
-# Support for pkg-config 
+# Support for pkg-config
 pkgconfigdir = $(libdir)/pkgconfig
 pkgconfig_DATA = qpid.pc

Added: qpid/branches/qpid-2920/qpid/cpp/src/qpid/BufferRef.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/BufferRef.h?rev=1081801&view=auto
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/BufferRef.h (added)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/BufferRef.h Tue Mar 15 14:34:02 2011
@@ -0,0 +1,70 @@
+#ifndef QPID_BUFFERREF_H
+#define QPID_BUFFERREF_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/RefCounted.h"
+#include <boost/intrusive_ptr.hpp>
+
+namespace qpid {
+
+/** Template for mutable or const buffer references */
+template <class T> class BufferRefT {
+  public:
+    BufferRefT() : begin_(0), end_(0) {}
+
+    BufferRefT(boost::intrusive_ptr<RefCounted> c, T* begin, T* end) :
+        counter(c), begin_(begin), end_(end) {}
+
+    template <class U> BufferRefT(const BufferRefT<U>& other) :
+        counter(other.counter), begin_(other.begin_), end_(other.end_) {}
+
+    T* begin() const { return begin_; }
+    T* end() const { return end_; }
+
+    /** Return a sub-buffer of the current buffer */
+    BufferRefT sub_buffer(T* begin, T* end) {
+        assert(begin_ <= begin && begin <= end_);
+        assert(begin_ <= end && end <= end_);
+        assert(begin <= end);
+        return BufferRefT(counter, begin, end);
+    }
+
+  private:
+    boost::intrusive_ptr<RefCounted> counter;
+    T* begin_;
+    T* end_;
+};
+
+/**
+ * Reference to a mutable ref-counted buffer.
+ */
+typedef BufferRefT<char> BufferRef;
+
+/**
+ * Reference to a const ref-counted buffer.
+ */
+typedef BufferRefT<const char> ConstBufferRef;
+
+} // namespace qpid
+
+#endif  /*!QPID_BUFFERREF_H*/

Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/BufferRef.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/BufferRef.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/branches/qpid-2920/qpid/cpp/src/qpid/Modules.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/Modules.cpp?rev=1081801&r1=1081800&r2=1081801&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/Modules.cpp (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/Modules.cpp Tue Mar 15 14:34:02 2011
@@ -64,7 +64,6 @@ void tryShlib(const char* libname_, bool
     if (!isShlibName(libname)) libname += suffix();
     try {
         sys::Shlib shlib(libname);
-        QPID_LOG (info, "Loaded Module: " << libname);
     }
     catch (const std::exception& /*e*/) {
         if (!noThrow)
@@ -82,7 +81,7 @@ void loadModuleDir (std::string dirname,
             return;
         throw Exception ("Directory not found: " + dirname);
     }
-    if (!fs::is_directory(dirPath)) 
+    if (!fs::is_directory(dirPath))
     {
         throw Exception ("Invalid value for module-dir: " + dirname + " is not a directory");
     }

Modified: qpid/branches/qpid-2920/qpid/cpp/src/qpid/RefCountedBuffer.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/RefCountedBuffer.cpp?rev=1081801&r1=1081800&r2=1081801&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/RefCountedBuffer.cpp (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/RefCountedBuffer.cpp Tue Mar 15 14:34:02 2011
@@ -24,30 +24,20 @@
 
 namespace qpid {
 
-RefCountedBuffer::RefCountedBuffer() : count(0) {}
-
-void RefCountedBuffer::destroy() const {
+void RefCountedBuffer::released() const {
     this->~RefCountedBuffer();
     ::delete[] reinterpret_cast<const char*>(this);
 }
 
-char* RefCountedBuffer::addr() const {
-    return const_cast<char*>(reinterpret_cast<const char*>(this)+sizeof(RefCountedBuffer));
-}
-
-RefCountedBuffer::pointer RefCountedBuffer::create(size_t n) {
+BufferRef RefCountedBuffer::create(size_t n) {
     char* store=::new char[n+sizeof(RefCountedBuffer)];
     new(store) RefCountedBuffer;
-    return pointer(reinterpret_cast<RefCountedBuffer*>(store));
+    char* start = store+sizeof(RefCountedBuffer);
+    return BufferRef(
+        boost::intrusive_ptr<RefCounted>(reinterpret_cast<RefCountedBuffer*>(store)),
+        start, start+n);
 }
 
-RefCountedBuffer::pointer::pointer() {}
-RefCountedBuffer::pointer::pointer(RefCountedBuffer* x) : p(x) {}
-RefCountedBuffer::pointer::pointer(const pointer& x) : p(x.p) {}
-RefCountedBuffer::pointer::~pointer() {}
-RefCountedBuffer::pointer& RefCountedBuffer::pointer::operator=(const RefCountedBuffer::pointer& x) { p = x.p; return *this; }
-
-char* RefCountedBuffer::pointer::cp() const { return p ? p->get() : 0; }
 } // namespace qpid
 
 

Modified: qpid/branches/qpid-2920/qpid/cpp/src/qpid/RefCountedBuffer.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/RefCountedBuffer.h?rev=1081801&r1=1081800&r2=1081801&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/RefCountedBuffer.h (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/RefCountedBuffer.h Tue Mar 15 14:34:02 2011
@@ -22,68 +22,23 @@
  *
  */
 
-#include <boost/utility.hpp>
-#include <boost/detail/atomic_count.hpp>
-#include <boost/intrusive_ptr.hpp>
+#include <qpid/RefCounted.h>
+#include <qpid/BufferRef.h>
 
 namespace qpid {
 
 /**
- * Reference-counted byte buffer.
- * No alignment guarantees.
+ * Reference-counted byte buffer. No alignment guarantees.
  */
-class RefCountedBuffer : boost::noncopyable {
-    mutable boost::detail::atomic_count count;
-    RefCountedBuffer();
-    void destroy() const;
-    char* addr() const;
-
-public:
-    /** Smart char pointer to a reference counted buffer */
-    class pointer {
-        boost::intrusive_ptr<RefCountedBuffer> p;
-        char* cp() const;
-        pointer(RefCountedBuffer* x);
-      friend class RefCountedBuffer;
-
-      public:
-        pointer();
-        pointer(const pointer&);
-        ~pointer();
-        pointer& operator=(const pointer&);
-
-        char* get() { return cp(); }
-        operator char*() { return cp(); }
-        char& operator*() { return *cp(); }
-        char& operator[](size_t i) { return cp()[i]; }
-
-        const char* get() const { return cp(); }
-        operator const char*() const { return cp(); }
-        const char& operator*() const { return *cp(); }
-        const char& operator[](size_t i) const { return cp()[i]; }
-    };
-
+class RefCountedBuffer : public RefCounted {
+  public:
     /** Create a reference counted buffer of size n */
-    static pointer create(size_t n);
+    static BufferRef create(size_t n);
 
-    /** Get a pointer to the start of the buffer. */
-    char* get() { return addr(); }
-    const char* get() const { return addr(); }
-    char& operator[](size_t i) { return get()[i]; }
-    const char& operator[](size_t i) const { return get()[i]; }
-
-    void addRef() const { ++count; }
-    void release() const { if (--count==0) destroy(); }
-    long refCount() { return count; }
+  protected:
+    void released() const;
 };
 
 } // namespace qpid
 
-// intrusive_ptr support.
-namespace boost {
-inline void intrusive_ptr_add_ref(const qpid::RefCountedBuffer* p) { p->addRef(); }
-inline void intrusive_ptr_release(const qpid::RefCountedBuffer* p) { p->release(); }
-}
-
-
 #endif  /*!QPID_REFCOUNTEDBUFFER_H*/

Modified: qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/TopicExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/TopicExchange.cpp?rev=1081801&r1=1081800&r2=1081801&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/TopicExchange.cpp (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/TopicExchange.cpp Tue Mar 15 14:34:02 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -38,7 +38,7 @@ namespace _qmf = qmf::org::apache::qpid:
 // - excessive string copying: should be 0 copy, match from original buffer.
 // - match/lookup: use descision tree or other more efficient structure.
 
-namespace 
+namespace
 {
 const std::string STAR("*");
 const std::string HASH("#");
@@ -110,7 +110,7 @@ public:
 // Iterate over a string of '.'-separated tokens.
 struct TopicExchange::TokenIterator {
     typedef pair<const char*,const char*> Token;
-    
+
     TokenIterator(const char* b, const char* e) : end(e), token(make_pair(b, find(b,e,'.'))) {}
 
     TokenIterator(const string& key) : end(&key[0]+key.size()), token(make_pair(&key[0], find(&key[0],end,'.'))) {}
@@ -221,7 +221,7 @@ TopicExchange::TopicExchange(const std::
 
 bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args)
 {
-	ClearCache cc(&cacheLock,&bindingCache); // clear the cache on function exit.
+    ClearCache cc(&cacheLock,&bindingCache); // clear the cache on function exit.
     string fedOp(args ? args->getAsString(qpidFedOp) : fedOpBind);
     string fedTags(args ? args->getAsString(qpidFedTags) : "");
     string fedOrigin(args ? args->getAsString(qpidFedOrigin) : "");
@@ -282,6 +282,7 @@ bool TopicExchange::bind(Queue::shared_p
         }
     }
 
+    cc.clearCache(); // clear the cache before we IVE route.
     routeIVE();
     if (propagate)
         propagateFedOp(routingKey, fedTags, fedOp, fedOrigin);
@@ -289,7 +290,7 @@ bool TopicExchange::bind(Queue::shared_p
 }
 
 bool TopicExchange::unbind(Queue::shared_ptr queue, const string& constRoutingKey, const FieldTable* /*args*/){
-	ClearCache cc(&cacheLock,&bindingCache); // clear the cache on function exit.
+    ClearCache cc(&cacheLock,&bindingCache); // clear the cache on function exit.
     RWlock::ScopedWlock l(lock);
     string routingKey = normalize(constRoutingKey);
     BindingKey* bk = bindingTree.getBindingKey(routingKey);
@@ -336,23 +337,24 @@ void TopicExchange::route(Deliverable& m
 {
     // Note: PERFORMANCE CRITICAL!!!
     BindingList b;
-	std::map<std::string, BindingList>::iterator it;
-	{  // only lock the cache for read
+    std::map<std::string, BindingList>::iterator it;
+    {  // only lock the cache for read
        RWlock::ScopedRlock cl(cacheLock);
-	   it = bindingCache.find(routingKey);
-	}
+       it = bindingCache.find(routingKey);
+       if (it != bindingCache.end()) {
+           b = it->second;
+       }
+    }
     PreRoute pr(msg, this);
-    if (it == bindingCache.end())  // no cache hit
+    if (!b.get())  // no cache hit
     {
         RWlock::ScopedRlock l(lock);
     	b = BindingList(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >);
         BindingsFinderIter bindingsFinder(b);
         bindingTree.iterateMatch(routingKey, bindingsFinder);
-	    RWlock::ScopedWlock cwl(cacheLock);
-		bindingCache[routingKey] = b; // update cache
-    }else {
-        b = it->second;
-     }
+        RWlock::ScopedWlock cwl(cacheLock);
+        bindingCache[routingKey] = b; // update cache
+    }
     doRoute(msg, b);
 }
 

Modified: qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/TopicExchange.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/TopicExchange.h?rev=1081801&r1=1081800&r2=1081801&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/TopicExchange.h (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/TopicExchange.h Tue Mar 15 14:34:02 2011
@@ -136,18 +136,26 @@ class TopicExchange : public virtual Exc
     unsigned long nBindings;
     qpid::sys::RWlock lock;     // protects bindingTree and nBindings
     qpid::sys::RWlock cacheLock;     // protects cache
-	std::map<std::string, BindingList> bindingCache; // cache of matched routes.
-	class ClearCache {
-	private:
-		qpid::sys::RWlock* cacheLock;
-		std::map<std::string, BindingList>* bindingCache; 
-	public:
-		ClearCache(qpid::sys::RWlock* l, std::map<std::string, BindingList>* bc): cacheLock(l),bindingCache(bc) {};
-		~ClearCache(){ 
-			qpid::sys::RWlock::ScopedWlock l(*cacheLock);
-			bindingCache->clear();   
-		};
-	};
+    std::map<std::string, BindingList> bindingCache; // cache of matched routes.
+    class ClearCache {
+    private:
+        qpid::sys::RWlock* cacheLock;
+        std::map<std::string, BindingList>* bindingCache;
+	bool cleared;
+    public:
+        ClearCache(qpid::sys::RWlock* l, std::map<std::string, BindingList>* bc): cacheLock(l),
+             bindingCache(bc),cleared(false) {};
+        void clearCache() {
+             qpid::sys::RWlock::ScopedWlock l(*cacheLock);
+             if (!cleared) {
+                 bindingCache->clear();
+                 cleared =true;
+             }
+        };
+        ~ClearCache(){
+	     clearCache();
+        };
+    };
     bool isBound(Queue::shared_ptr queue, const std::string& pattern);
 
     class ReOriginIter;

Modified: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/Event.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/Event.cpp?rev=1081801&r1=1081800&r2=1081801&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/Event.cpp (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/Event.cpp Tue Mar 15 14:34:02 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,6 +23,7 @@
 #include "qpid/cluster/Cpg.h"
 #include "qpid/framing/Buffer.h"
 #include "qpid/framing/AMQFrame.h"
+#include "qpid/RefCountedBuffer.h"
 #include "qpid/assert.h"
 #include <ostream>
 #include <iterator>

Modified: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/Event.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/Event.h?rev=1081801&r1=1081800&r2=1081801&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/Event.h (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/Event.h Tue Mar 15 14:34:02 2011
@@ -23,7 +23,7 @@
  */
 
 #include "qpid/cluster/types.h"
-#include "qpid/RefCountedBuffer.h"
+#include "qpid/BufferRef.h"
 #include "qpid/framing/AMQFrame.h"
 #include <sys/uio.h>            // For iovec
 #include <iosfwd>
@@ -88,12 +88,12 @@ class Event : public EventHeader {
     static Event control(const framing::AMQFrame&, const ConnectionId&);
 
     // Data excluding header.
-    char* getData() { return store + HEADER_SIZE; }
-    const char* getData() const { return store + HEADER_SIZE; }
+    char* getData() { return store.begin() + HEADER_SIZE; }
+    const char* getData() const { return store.begin() + HEADER_SIZE; }
 
     // Store including header
-    char* getStore() { return store; }
-    const char* getStore() const { return store; }
+    char* getStore() { return store.begin(); }
+    const char* getStore() const { return store.begin(); }
 
     const framing::AMQFrame& getFrame() const;
 
@@ -104,7 +104,7 @@ class Event : public EventHeader {
   private:
     void encodeHeader() const;
 
-    RefCountedBuffer::pointer store;
+    BufferRef store;
     mutable framing::AMQFrame frame;
 };
 

Modified: qpid/branches/qpid-2920/qpid/cpp/src/tests/qpid-cluster-benchmark
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/tests/qpid-cluster-benchmark?rev=1081801&r1=1081800&r2=1081801&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/tests/qpid-cluster-benchmark (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/tests/qpid-cluster-benchmark Tue Mar 15 14:34:02 2011
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,21 +19,37 @@
 #
 
 # Benchmark script for comparing cluster performance.
-#PORT=":5555"
-BROKER=`echo $HOSTS | awk '{print $1}'`	# Single broker
-BROKERS=`echo $HOSTS | sed "s/\>/$PORT/g;s/ /,/g"` # Broker URL list
-COUNT=100000
-RATE=20000			# Rate to throttle senders for latency results
-run_test() { echo $*; "$@"; echo; echo; echo; }
 
-# Thruput,  unshared queue
-run_test qpid-cpp-benchmark --repeat 10 -b $BROKER --no-timestamp -m $COUNT
+# Default values
+PORT="5672"
+BROKERS=`echo $HOSTS | sed "s/\>/:$PORT/g;s/ /,/g"` # Broker URL list
+COUNT=10000
+FLOW=100	      # Flow control limit on queue depth for latency.
+REPEAT=10
+SCALE=10
 
-# Latency
-run_test qpid-cpp-benchmark --repeat 10 -b $BROKER --connection-options '{tcp-nodelay:true}' -m `expr $COUNT / 2` --send-rate $RATE
+while getopts "p:c:f:r:t:b:" opt; do
+    case $opt in
+	p) PORT=$OPTARG;;
+	c) COUNT=$OPTARG;;
+	f) FLOW=$OPTARG;;
+	r) REPEAT=$OPTARG;;
+	s) SCALE=$OPTARG;;
+	b) BROKERS=$OPTARG;;
+	*) echo "Unknown option"; exit 1;;
+    esac
+done
+
+BROKER=`echo $HOSTS | sed 's/,.*//'` # First broker
+
+run_test() { echo $*; shift; "$@"; echo; echo; echo; }
 
 # Multiple pubs/subs connect via multiple brokers (active-active)
-run_test qpid-cpp-benchmark --repeat 10 -b $BROKERS --no-timestamp --summarize -s10 -r10 -m `expr $COUNT / 10`
+run_test "multi-host-thruput" qpid-cpp-benchmark --repeat $REPEAT -b $BROKERS --no-timestamp --summarize -s$SCALE -r$SCALE -m $COUNT
 
 # Multiple pubs/subs connect via single broker (active-passive)
-run_test qpid-cpp-benchmark --repeat 10 -b $BROKER --no-timestamp --summarize -s10 -r10 -m `expr $COUNT / 10`
+run_test "single-host-thruput" qpid-cpp-benchmark --repeat $REPEAT -b $BROKER --no-timestamp --summarize -s$SCALE -r$SCALE -m $COUNT
+
+# Latency
+run_test "latency" qpid-cpp-benchmark --repeat $REPEAT -b $BROKER --connection-options '{tcp-nodelay:true}' -m $COUNT --flow-control $FLOW
+

Modified: qpid/branches/qpid-2920/qpid/cpp/src/tests/qpid-cpp-benchmark
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/tests/qpid-cpp-benchmark?rev=1081801&r1=1081800&r2=1081801&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/tests/qpid-cpp-benchmark (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/tests/qpid-cpp-benchmark Tue Mar 15 14:34:02 2011
@@ -77,6 +77,20 @@ def ssh_command(host, command):
     """Convert command into an ssh command on host with quoting"""
     return ["ssh", host] + [posix_quote(arg) for arg in command]
 
+class Clients:
+    def __init__(self): self.clients=[]
+
+    def add(self, client):
+        self.clients.append(client)
+        return client
+
+    def kill(self):
+        for c in self.clients:
+            try: c.kill()
+            except: pass
+
+clients = Clients()
+
 def start_receive(queue, index, opts, ready_queue, broker, host):
     address_opts=["create:receiver"] + opts.receive_option
     if opts.durable: address_opts += ["node:{durable:true}"]
@@ -101,7 +115,7 @@ def start_receive(queue, index, opts, re
     if opts.connection_options:
         command += ["--connection-options",opts.connection_options]
     if host: command = ssh_command(host, command)
-    return Popen(command, stdout=PIPE)
+    return clients.add(Popen(command, stdout=PIPE))
 
 def start_send(queue, opts, broker, host):
     address="%s;{%s}"%(queue,",".join(opts.send_option))
@@ -122,7 +136,7 @@ def start_send(queue, opts, broker, host
     if opts.connection_options:
         command += ["--connection-options",opts.connection_options]
     if host: command = ssh_command(host, command)
-    return Popen(command, stdout=PIPE)
+    return clients.add(Popen(command, stdout=PIPE))
 
 def first_line(p):
     out,err=p.communicate()
@@ -133,7 +147,11 @@ def delete_queues(queues, broker):
     c = qpid.messaging.Connection(broker)
     c.open()
     for q in queues:
-        try: s = c.session().sender("%s;{delete:always}"%(q))
+        try:
+            s = c.session()
+            snd = s.sender("%s;{delete:always}"%(q))
+            snd.close()
+            s.sync()
         except qpid.messaging.exceptions.NotFound: pass # Ignore "no such queue"
     c.close()
 
@@ -145,7 +163,6 @@ def print_header(timestamp):
 def parse(parser, lines):               # Parse sender/receiver output
     for l in lines:
         fn_val = zip(parser, l)
-
     return [map(lambda p: p[0](p[1]), zip(parser,line.split())) for line in lines]
 
 def parse_senders(senders):
@@ -156,11 +173,12 @@ def parse_receivers(receivers):
 
 def print_data(send_stats, recv_stats):
     for send,recv in map(None, send_stats, recv_stats):
-        if send: print send[0],
+        line=""
+        if send: line += "%d"%send[0]
         if recv:
-            print "\t\t%d"%recv[0],
-            if len(recv) == 4: print "\t%.2f\t%.2f\t%.2f"%tuple(recv[1:]),
-        print
+            line += "\t\t%d"%recv[0]
+            if len(recv) == 4: line += "\t%.2f\t%.2f\t%.2f"%tuple(recv[1:])
+        print line
 
 def print_summary(send_stats, recv_stats):
     def avg(s): sum(s) / len(s)
@@ -184,11 +202,11 @@ class ReadyReceiver:
         self.receiver = self.connection.session().receiver(
             "%s;{create:receiver,delete:receiver,node:{durable:false}}"%(queue))
         self.receiver.session.sync()
-        self.timeout=2
+        self.timeout=10
 
     def wait(self, receivers):
         try:
-            for i in xrange(len(receivers)): self.receiver.fetch(self.timeout)
+            for i in receivers: self.receiver.fetch(self.timeout)
             self.connection.close()
         except qpid.messaging.Empty:
             for r in receivers:
@@ -221,20 +239,22 @@ def main():
     receive_out = ""
     ready_queue="%s-ready"%(opts.queue_name)
     queues = ["%s-%s"%(opts.queue_name, i) for i in xrange(opts.queues)]
-    for i in xrange(opts.repeat):
-        delete_queues(queues, opts.broker[0])
-        ready_receiver = ReadyReceiver(ready_queue, opts.broker[0])
-        receivers = [start_receive(q, j, opts, ready_queue, brokers.next(), client_hosts.next())
-                     for q in queues for j in xrange(opts.receivers)]
-        ready_receiver.wait(filter(None, receivers)) # Wait for receivers to be ready.
-        senders = [start_send(q, opts,brokers.next(), client_hosts.next())
-                   for q in queues for j in xrange(opts.senders)]
-        if opts.report_header and i == 0: print_header(opts.timestamp)
-        send_stats=parse_senders(senders)
-        recv_stats=parse_receivers(receivers)
-        if opts.summarize: print_summary(send_stats, recv_stats)
-        else: print_data(send_stats, recv_stats)
-        delete_queues(queues, opts.broker[0])
+    try:
+        for i in xrange(opts.repeat):
+            delete_queues(queues, opts.broker[0])
+            ready_receiver = ReadyReceiver(ready_queue, opts.broker[0])
+            receivers = [start_receive(q, j, opts, ready_queue, brokers.next(), client_hosts.next())
+                         for q in queues for j in xrange(opts.receivers)]
+            ready_receiver.wait(filter(None, receivers)) # Wait for receivers to be ready.
+            senders = [start_send(q, opts,brokers.next(), client_hosts.next())
+                       for q in queues for j in xrange(opts.senders)]
+            if opts.report_header and i == 0: print_header(opts.timestamp)
+            send_stats=parse_senders(senders)
+            recv_stats=parse_receivers(receivers)
+            if opts.summarize: print_summary(send_stats, recv_stats)
+            else: print_data(send_stats, recv_stats)
+            delete_queues(queues, opts.broker[0])
+    finally: clients.kill()             # No strays
 
 if __name__ == "__main__": main()
 

Modified: qpid/branches/qpid-2920/qpid/cpp/src/tests/qpid-receive.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/tests/qpid-receive.cpp?rev=1081801&r1=1081800&r2=1081801&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/tests/qpid-receive.cpp (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/tests/qpid-receive.cpp Tue Mar 15 14:34:02 2011
@@ -262,7 +262,7 @@ int main(int argc, char ** argv)
             return 0;
         }
     } catch(const std::exception& error) {
-        std::cerr << "Failure: " << error.what() << std::endl;
+        std::cerr << "qpid-receive: " << error.what() << std::endl;
         connection.close();
         return 1;
     }

Modified: qpid/branches/qpid-2920/qpid/cpp/src/tests/qpid-send.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/tests/qpid-send.cpp?rev=1081801&r1=1081800&r2=1081801&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/tests/qpid-send.cpp (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/tests/qpid-send.cpp Tue Mar 15 14:34:02 2011
@@ -368,7 +368,7 @@ int main(int argc, char ** argv)
             return 0;
         }
     } catch(const std::exception& error) {
-        std::cout << "Failed: " << error.what() << std::endl;
+        std::cerr << "qpid-send: " << error.what() << std::endl;
         connection.close();
         return 1;
     }

Modified: qpid/branches/qpid-2920/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1081801&r1=1081800&r2=1081801&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/branches/qpid-2920/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Tue Mar 15 14:34:02 2011
@@ -1225,7 +1225,6 @@ public abstract class AMQSession<C exten
                 else
                 {
                     AMQQueue queue = new AMQQueue(queueName);
-                    queue.setCreate(AddressOption.ALWAYS);
                     return queue;
                     
                 }

Modified: qpid/branches/qpid-2920/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java?rev=1081801&r1=1081800&r2=1081801&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java (original)
+++ qpid/branches/qpid-2920/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java Tue Mar 15 14:34:02 2011
@@ -1,6 +1,6 @@
 package org.apache.qpid.test.client.destination;
 /*
- * 
+ *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -8,16 +8,16 @@ package org.apache.qpid.test.client.dest
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- * 
+ *
  */
 
 
@@ -58,7 +58,7 @@ public class AddressBasedDestinationTest
 {
     private static final Logger _logger = LoggerFactory.getLogger(AddressBasedDestinationTest.class);
     private Connection _connection;
-    
+
     @Override
     public void setUp() throws Exception
     {
@@ -66,20 +66,20 @@ public class AddressBasedDestinationTest
         _connection = getConnection() ;
         _connection.start();
     }
-    
+
     @Override
     public void tearDown() throws Exception
     {
         _connection.close();
         super.tearDown();
     }
-    
+
     public void testCreateOptions() throws Exception
     {
         Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
         MessageProducer prod;
         MessageConsumer cons;
-        
+
         // default (create never, assert never) -------------------
         // create never --------------------------------------------
         String addr1 = "ADDR:testQueue1";
@@ -93,7 +93,7 @@ public class AddressBasedDestinationTest
             assertTrue(e.getMessage().contains("The name 'testQueue1' supplied in the address " +
                     "doesn't resolve to an exchange or a queue"));
         }
-        
+
         try
         {
             prod = jmsSession.createProducer(dest);
@@ -103,22 +103,22 @@ public class AddressBasedDestinationTest
             assertTrue(e.getCause().getCause().getMessage().contains("The name 'testQueue1' supplied in the address " +
                     "doesn't resolve to an exchange or a queue"));
         }
-            
+
         assertFalse("Queue should not be created",(
                 (AMQSession_0_10)jmsSession).isQueueExist(dest, (QueueNode)dest.getSourceNode() ,true));
-        
-        
+
+
         // create always -------------------------------------------
         addr1 = "ADDR:testQueue1; { create: always }";
         dest = new AMQAnyDestination(addr1);
-        cons = jmsSession.createConsumer(dest); 
-        
+        cons = jmsSession.createConsumer(dest);
+
         assertTrue("Queue not created as expected",(
-                (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));              
+                (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
         assertTrue("Queue not bound as expected",(
-                (AMQSession_0_10)jmsSession).isQueueBound("", 
+                (AMQSession_0_10)jmsSession).isQueueBound("",
                     dest.getAddressName(),dest.getAddressName(), dest.getSourceNode().getDeclareArgs()));
-        
+
         // create receiver -----------------------------------------
         addr1 = "ADDR:testQueue2; { create: receiver }";
         dest = new AMQAnyDestination(addr1);
@@ -131,32 +131,32 @@ public class AddressBasedDestinationTest
             assertTrue(e.getCause().getCause().getMessage().contains("The name 'testQueue2' supplied in the address " +
                     "doesn't resolve to an exchange or a queue"));
         }
-            
+
         assertFalse("Queue should not be created",(
                 (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
-        
-        
-        cons = jmsSession.createConsumer(dest); 
-        
+
+
+        cons = jmsSession.createConsumer(dest);
+
         assertTrue("Queue not created as expected",(
-                (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));              
+                (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
         assertTrue("Queue not bound as expected",(
-                (AMQSession_0_10)jmsSession).isQueueBound("", 
+                (AMQSession_0_10)jmsSession).isQueueBound("",
                     dest.getAddressName(),dest.getAddressName(), dest.getSourceNode().getDeclareArgs()));
-        
+
         // create never --------------------------------------------
         addr1 = "ADDR:testQueue3; { create: never }";
         dest = new AMQAnyDestination(addr1);
         try
         {
-            cons = jmsSession.createConsumer(dest); 
+            cons = jmsSession.createConsumer(dest);
         }
         catch(JMSException e)
         {
             assertTrue(e.getMessage().contains("The name 'testQueue3' supplied in the address " +
                     "doesn't resolve to an exchange or a queue"));
         }
-        
+
         try
         {
             prod = jmsSession.createProducer(dest);
@@ -166,17 +166,17 @@ public class AddressBasedDestinationTest
             assertTrue(e.getCause().getCause().getMessage().contains("The name 'testQueue3' supplied in the address " +
                     "doesn't resolve to an exchange or a queue"));
         }
-            
+
         assertFalse("Queue should not be created",(
                 (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
-        
+
         // create sender ------------------------------------------
         addr1 = "ADDR:testQueue3; { create: sender }";
         dest = new AMQAnyDestination(addr1);
-                
+
         try
         {
-            cons = jmsSession.createConsumer(dest); 
+            cons = jmsSession.createConsumer(dest);
         }
         catch(JMSException e)
         {
@@ -185,162 +185,162 @@ public class AddressBasedDestinationTest
         }
         assertFalse("Queue should not be created",(
                 (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
-        
+
         prod = jmsSession.createProducer(dest);
         assertTrue("Queue not created as expected",(
-                (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));              
+                (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
         assertTrue("Queue not bound as expected",(
-                (AMQSession_0_10)jmsSession).isQueueBound("", 
+                (AMQSession_0_10)jmsSession).isQueueBound("",
                     dest.getAddressName(),dest.getAddressName(), dest.getSourceNode().getDeclareArgs()));
-        
+
     }
-    
+
     // todo add tests for delete options
-    
+
     public void testCreateQueue() throws Exception
     {
         Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
-        
+
         String addr = "ADDR:my-queue/hello; " +
-                      "{" + 
+                      "{" +
                             "create: always, " +
-                            "node: " + 
-                            "{" + 
+                            "node: " +
+                            "{" +
                                  "durable: true ," +
                                  "x-declare: " +
-                                 "{" + 
+                                 "{" +
                                      "auto-delete: true," +
-                                     "arguments: {" +  
+                                     "arguments: {" +
                                         "'qpid.max_size': 1000," +
                                         "'qpid.max_count': 100" +
-                                     "}" + 
-                                  "}, " +   
-                                  "x-bindings: [{exchange : 'amq.direct', key : test}, " + 
+                                     "}" +
+                                  "}, " +
+                                  "x-bindings: [{exchange : 'amq.direct', key : test}, " +
                                                "{exchange : 'amq.fanout'}," +
                                                "{exchange: 'amq.match', arguments: {x-match: any, dep: sales, loc: CA}}," +
                                                "{exchange : 'amq.topic', key : 'a.#'}" +
-                                              "]," + 
-                                     
+                                              "]," +
+
                             "}" +
                       "}";
         AMQDestination dest = new AMQAnyDestination(addr);
-        MessageConsumer cons = jmsSession.createConsumer(dest); 
-        
+        MessageConsumer cons = jmsSession.createConsumer(dest);
+
         assertTrue("Queue not created as expected",(
-                (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));              
-        
+                (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
+
         assertTrue("Queue not bound as expected",(
-                (AMQSession_0_10)jmsSession).isQueueBound("", 
+                (AMQSession_0_10)jmsSession).isQueueBound("",
                     dest.getAddressName(),dest.getAddressName(), null));
-        
+
         assertTrue("Queue not bound as expected",(
-                (AMQSession_0_10)jmsSession).isQueueBound("amq.direct", 
+                (AMQSession_0_10)jmsSession).isQueueBound("amq.direct",
                     dest.getAddressName(),"test", null));
-        
+
         assertTrue("Queue not bound as expected",(
-                (AMQSession_0_10)jmsSession).isQueueBound("amq.fanout", 
+                (AMQSession_0_10)jmsSession).isQueueBound("amq.fanout",
                     dest.getAddressName(),null, null));
-        
+
         assertTrue("Queue not bound as expected",(
-                (AMQSession_0_10)jmsSession).isQueueBound("amq.topic", 
-                    dest.getAddressName(),"a.#", null));   
-        
+                (AMQSession_0_10)jmsSession).isQueueBound("amq.topic",
+                    dest.getAddressName(),"a.#", null));
+
         Map<String,Object> args = new HashMap<String,Object>();
         args.put("x-match","any");
         args.put("dep","sales");
         args.put("loc","CA");
         assertTrue("Queue not bound as expected",(
-                (AMQSession_0_10)jmsSession).isQueueBound("amq.match", 
+                (AMQSession_0_10)jmsSession).isQueueBound("amq.match",
                     dest.getAddressName(),null, args));
-        
+
     }
-    
+
     public void testCreateExchange() throws Exception
     {
         Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
-        
-        String addr = "ADDR:my-exchange/hello; " + 
-                      "{ " + 
-                        "create: always, " +                        
-                        "node: " + 
+
+        String addr = "ADDR:my-exchange/hello; " +
+                      "{ " +
+                        "create: always, " +
+                        "node: " +
                         "{" +
                              "type: topic, " +
                              "x-declare: " +
-                             "{ " + 
-                                 "type:direct, " + 
+                             "{ " +
+                                 "type:direct, " +
                                  "auto-delete: true, " +
-                                 "arguments: {" +  
+                                 "arguments: {" +
                                    "'qpid.msg_sequence': 1, " +
                                    "'qpid.ive': 1" +
                                  "}" +
                              "}" +
                         "}" +
                       "}";
-        
+
         AMQDestination dest = new AMQAnyDestination(addr);
-        MessageConsumer cons = jmsSession.createConsumer(dest); 
-        
+        MessageConsumer cons = jmsSession.createConsumer(dest);
+
         assertTrue("Exchange not created as expected",(
                 (AMQSession_0_10)jmsSession).isExchangeExist(dest, (ExchangeNode)dest.getTargetNode() , true));
-       
+
         // The existence of the queue is implicitly tested here
         assertTrue("Queue not bound as expected",(
-                (AMQSession_0_10)jmsSession).isQueueBound("my-exchange", 
+                (AMQSession_0_10)jmsSession).isQueueBound("my-exchange",
                     dest.getQueueName(),"hello", Collections.<String, Object>emptyMap()));
-        
+
         // The client should be able to query and verify the existence of my-exchange (QPID-2774)
         dest = new AMQAnyDestination("ADDR:my-exchange; {create: never}");
-        cons = jmsSession.createConsumer(dest); 
+        cons = jmsSession.createConsumer(dest);
     }
-    
+
     public void testBindQueueWithArgs() throws Exception
     {
         Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
-        
+
         String headersBinding = "{exchange: 'amq.match', arguments: {x-match: any, dep: sales, loc: CA}}";
-        
-        String addr = "ADDR:my-queue/hello; " + 
-                      "{ " + 
+
+        String addr = "ADDR:my-queue/hello; " +
+                      "{ " +
                            "create: always, " +
-                           "node: "  + 
-                           "{" + 
+                           "node: "  +
+                           "{" +
                                "durable: true ," +
-                               "x-declare: " + 
-                               "{ " + 
+                               "x-declare: " +
+                               "{ " +
                                      "auto-delete: true," +
                                      "arguments: {'qpid.max_count': 100}" +
                                "}, " +
                                "x-bindings: [{exchange : 'amq.direct', key : test}, " +
-                                            "{exchange : 'amq.topic', key : 'a.#'}," + 
-                                             headersBinding + 
+                                            "{exchange : 'amq.topic', key : 'a.#'}," +
+                                             headersBinding +
                                            "]" +
                            "}" +
                       "}";
 
         AMQDestination dest = new AMQAnyDestination(addr);
-        MessageConsumer cons = jmsSession.createConsumer(dest); 
-        
+        MessageConsumer cons = jmsSession.createConsumer(dest);
+
         assertTrue("Queue not created as expected",(
-                (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));              
-        
+                (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
+
         assertTrue("Queue not bound as expected",(
-                (AMQSession_0_10)jmsSession).isQueueBound("", 
+                (AMQSession_0_10)jmsSession).isQueueBound("",
                     dest.getAddressName(),dest.getAddressName(), null));
-        
+
         assertTrue("Queue not bound as expected",(
-                (AMQSession_0_10)jmsSession).isQueueBound("amq.direct", 
-                    dest.getAddressName(),"test", null));  
-      
+                (AMQSession_0_10)jmsSession).isQueueBound("amq.direct",
+                    dest.getAddressName(),"test", null));
+
         assertTrue("Queue not bound as expected",(
-                (AMQSession_0_10)jmsSession).isQueueBound("amq.topic", 
+                (AMQSession_0_10)jmsSession).isQueueBound("amq.topic",
                     dest.getAddressName(),"a.#", null));
-        
+
         Address a = Address.parse(headersBinding);
         assertTrue("Queue not bound as expected",(
-                (AMQSession_0_10)jmsSession).isQueueBound("amq.match", 
+                (AMQSession_0_10)jmsSession).isQueueBound("amq.match",
                     dest.getAddressName(),null, a.getOptions()));
     }
-    
+
     /**
      * Test goal: Verifies the capacity property in address string is handled properly.
      * Test strategy:
@@ -348,22 +348,22 @@ public class AddressBasedDestinationTest
      * Creates consumer with client ack.
      * Sends 15 messages to the queue, tries to receive 10.
      * Tries to receive the 11th message and checks if its null.
-     * 
-     * Since capacity is 10 and we haven't acked any messages, 
+     *
+     * Since capacity is 10 and we haven't acked any messages,
      * we should not have received the 11th.
-     * 
+     *
      * Acks the 10th message and verifies we receive the rest of the msgs.
      */
     public void testCapacity() throws Exception
     {
         verifyCapacity("ADDR:my-queue; {create: always, link:{capacity: 10}}");
     }
-    
+
     public void testSourceAndTargetCapacity() throws Exception
     {
         verifyCapacity("ADDR:my-queue; {create: always, link:{capacity: {source:10, target:15} }}");
     }
-    
+
     private void verifyCapacity(String address) throws Exception
     {
         if (!isCppBroker())
@@ -371,24 +371,24 @@ public class AddressBasedDestinationTest
             _logger.info("Not C++ broker, exiting test");
             return;
         }
-        
+
         Session jmsSession = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
-        
+
         AMQDestination dest = new AMQAnyDestination(address);
-        MessageConsumer cons = jmsSession.createConsumer(dest); 
+        MessageConsumer cons = jmsSession.createConsumer(dest);
         MessageProducer prod = jmsSession.createProducer(dest);
-        
+
         for (int i=0; i< 15; i++)
         {
             prod.send(jmsSession.createTextMessage("msg" + i) );
         }
-        
+
         for (int i=0; i< 9; i++)
         {
             cons.receive();
         }
         Message msg = cons.receive(RECEIVE_TIMEOUT);
-        assertNotNull("Should have received the 10th message",msg);        
+        assertNotNull("Should have received the 10th message",msg);
         assertNull("Shouldn't have received the 11th message as capacity is 10",cons.receive(RECEIVE_TIMEOUT));
         msg.acknowledge();
         for (int i=11; i<16; i++)
@@ -396,48 +396,48 @@ public class AddressBasedDestinationTest
             assertNotNull("Should have received the " + i + "th message as we acked the last 10",cons.receive(RECEIVE_TIMEOUT));
         }
     }
-    
+
     /**
      * Test goal: Verifies if the new address format based destinations
      *            can be specified and loaded correctly from the properties file.
-     * 
+     *
      */
     public void testLoadingFromPropertiesFile() throws Exception
     {
-        Hashtable<String,String> map = new Hashtable<String,String>();        
-        map.put("destination.myQueue1", "ADDR:my-queue/hello; {create: always, node: " + 
+        Hashtable<String,String> map = new Hashtable<String,String>();
+        map.put("destination.myQueue1", "ADDR:my-queue/hello; {create: always, node: " +
                 "{x-declare: {auto-delete: true, arguments : {'qpid.max_size': 1000}}}}");
-        
+
         map.put("destination.myQueue2", "ADDR:my-queue2; { create: receiver }");
 
         map.put("destination.myQueue3", "BURL:direct://amq.direct/my-queue3?routingkey='test'");
-        
+
         PropertiesFileInitialContextFactory props = new PropertiesFileInitialContextFactory();
         Context ctx = props.getInitialContext(map);
-        
-        AMQDestination dest1 = (AMQDestination)ctx.lookup("myQueue1");      
+
+        AMQDestination dest1 = (AMQDestination)ctx.lookup("myQueue1");
         AMQDestination dest2 = (AMQDestination)ctx.lookup("myQueue2");
         AMQDestination dest3 = (AMQDestination)ctx.lookup("myQueue3");
-        
+
         Session jmsSession = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
-        MessageConsumer cons1 = jmsSession.createConsumer(dest1); 
+        MessageConsumer cons1 = jmsSession.createConsumer(dest1);
         MessageConsumer cons2 = jmsSession.createConsumer(dest2);
         MessageConsumer cons3 = jmsSession.createConsumer(dest3);
-        
+
         assertTrue("Destination1 was not created as expected",(
-                (AMQSession_0_10)jmsSession).isQueueExist(dest1,(QueueNode)dest1.getSourceNode(), true));              
-        
+                (AMQSession_0_10)jmsSession).isQueueExist(dest1,(QueueNode)dest1.getSourceNode(), true));
+
         assertTrue("Destination1 was not bound as expected",(
-                (AMQSession_0_10)jmsSession).isQueueBound("", 
+                (AMQSession_0_10)jmsSession).isQueueBound("",
                     dest1.getAddressName(),dest1.getAddressName(), null));
-        
+
         assertTrue("Destination2 was not created as expected",(
-                (AMQSession_0_10)jmsSession).isQueueExist(dest2,(QueueNode)dest2.getSourceNode(), true));              
-        
+                (AMQSession_0_10)jmsSession).isQueueExist(dest2,(QueueNode)dest2.getSourceNode(), true));
+
         assertTrue("Destination2 was not bound as expected",(
-                (AMQSession_0_10)jmsSession).isQueueBound("", 
+                (AMQSession_0_10)jmsSession).isQueueBound("",
                     dest2.getAddressName(),dest2.getAddressName(), null));
-        
+
         MessageProducer producer = jmsSession.createProducer(dest3);
         producer.send(jmsSession.createTextMessage("Hello"));
         TextMessage msg = (TextMessage)cons3.receive(1000);
@@ -448,64 +448,64 @@ public class AddressBasedDestinationTest
      * Test goal: Verifies the subject can be overridden using "qpid.subject" message property.
      * Test strategy: Creates and address with a default subject "topic1"
      *                Creates a message with "qpid.subject"="topic2" and sends it.
-     *                Verifies that the message goes to "topic2" instead of "topic1". 
+     *                Verifies that the message goes to "topic2" instead of "topic1".
      */
     public void testOverridingSubject() throws Exception
     {
         Session jmsSession = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
-        
+
         AMQDestination topic1 = new AMQAnyDestination("ADDR:amq.topic/topic1; {link:{name: queue1}}");
-        
+
         MessageProducer prod = jmsSession.createProducer(topic1);
-        
+
         Message m = jmsSession.createTextMessage("Hello");
         m.setStringProperty("qpid.subject", "topic2");
-        
+
         MessageConsumer consForTopic1 = jmsSession.createConsumer(topic1);
         MessageConsumer consForTopic2 = jmsSession.createConsumer(new AMQAnyDestination("ADDR:amq.topic/topic2; {link:{name: queue2}}"));
-        
+
         prod.send(m);
         Message msg = consForTopic1.receive(1000);
         assertNull("message shouldn't have been sent to topic1",msg);
-        
+
         msg = consForTopic2.receive(1000);
-        assertNotNull("message should have been sent to topic2",msg);        
-        
+        assertNotNull("message should have been sent to topic2",msg);
+
     }
-    
+
     /**
-    * Test goal: Verifies that and address based destination can be used successfully 
+    * Test goal: Verifies that and address based destination can be used successfully
     *            as a reply to.
     */
     public void testAddressBasedReplyTo() throws Exception
     {
         Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
-        
+
         String addr = "ADDR:amq.direct/x512; {create: receiver, " +
-                      "link : {name : 'MY.RESP.QUEUE', " + 
+                      "link : {name : 'MY.RESP.QUEUE', " +
                       "x-declare : { auto-delete: true, exclusive: true, " +
                                    "arguments : {'qpid.max_size': 1000, 'qpid.policy_type': ring }} } }";
-        
+
         Destination replyTo = new AMQAnyDestination(addr);
         Destination dest =new AMQAnyDestination("ADDR:amq.direct/Hello");
-        
-        MessageConsumer cons = jmsSession.createConsumer(dest);                
+
+        MessageConsumer cons = jmsSession.createConsumer(dest);
         MessageProducer prod = jmsSession.createProducer(dest);
         Message m = jmsSession.createTextMessage("Hello");
         m.setJMSReplyTo(replyTo);
         prod.send(m);
-        
+
         Message msg = cons.receive(1000);
         assertNotNull("consumer should have received the message",msg);
-        
+
         MessageConsumer replyToCons = jmsSession.createConsumer(replyTo);
         MessageProducer replyToProd = jmsSession.createProducer(msg.getJMSReplyTo());
         replyToProd.send(jmsSession.createTextMessage("reply"));
-        
+
         Message replyToMsg = replyToCons.receive(1000);
-        assertNotNull("The reply to consumer should have got the message",replyToMsg);        
+        assertNotNull("The reply to consumer should have got the message",replyToMsg);
     }
-    
+
     /**
      * Test goal: Verifies that session.createQueue method
      *            works as expected both with the new and old addressing scheme.
@@ -513,46 +513,61 @@ public class AddressBasedDestinationTest
     public void testSessionCreateQueue() throws Exception
     {
         Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
-        
+
         // Using the BURL method
         Destination queue = ssn.createQueue("my-queue");
-        MessageProducer prod = ssn.createProducer(queue); 
+        MessageProducer prod = ssn.createProducer(queue);
         MessageConsumer cons = ssn.createConsumer(queue);
         assertTrue("my-queue was not created as expected",(
-                (AMQSession_0_10)ssn).isQueueBound("amq.direct", 
+                (AMQSession_0_10)ssn).isQueueBound("amq.direct",
                     "my-queue","my-queue", null));
-        
+
         prod.send(ssn.createTextMessage("test"));
         assertNotNull("consumer should receive a message",cons.receive(1000));
         cons.close();
-        
+
         // Using the ADDR method
+        // default case
         queue = ssn.createQueue("ADDR:my-queue2");
-        prod = ssn.createProducer(queue); 
+        try
+        {
+        	prod = ssn.createProducer(queue);
+        	fail("The client should throw an exception, since there is no queue present in the broker");
+        }
+        catch(Exception e)
+        {
+        	String s = "The name 'my-queue2' supplied in the address " +
+        			"doesn't resolve to an exchange or a queue";
+        	assertEquals(s,e.getCause().getCause().getMessage());
+        }
+
+        // explicit create case
+        queue = ssn.createQueue("ADDR:my-queue2; {create: sender}");
+        prod = ssn.createProducer(queue);
         cons = ssn.createConsumer(queue);
         assertTrue("my-queue2 was not created as expected",(
-                (AMQSession_0_10)ssn).isQueueBound("", 
+                (AMQSession_0_10)ssn).isQueueBound("",
                     "my-queue2","my-queue2", null));
-        
+
         prod.send(ssn.createTextMessage("test"));
         assertNotNull("consumer should receive a message",cons.receive(1000));
         cons.close();
-        
+
         // Using the ADDR method to create a more complicated queue
         String addr = "ADDR:amq.direct/x512; {create: receiver, " +
-        "link : {name : 'MY.RESP.QUEUE', " + 
+        "link : {name : 'MY.RESP.QUEUE', " +
         "x-declare : { auto-delete: true, exclusive: true, " +
                      "arguments : {'qpid.max_size': 1000, 'qpid.policy_type': ring} } } }";
         queue = ssn.createQueue(addr);
-        
-        prod = ssn.createProducer(queue); 
+
+        prod = ssn.createProducer(queue);
         cons = ssn.createConsumer(queue);
         assertTrue("MY.RESP.QUEUE was not created as expected",(
-                (AMQSession_0_10)ssn).isQueueBound("amq.direct", 
+                (AMQSession_0_10)ssn).isQueueBound("amq.direct",
                     "MY.RESP.QUEUE","x512", null));
         cons.close();
     }
-    
+
     /**
      * Test goal: Verifies that session.creatTopic method
      *            works as expected both with the new and old addressing scheme.
@@ -560,71 +575,71 @@ public class AddressBasedDestinationTest
     public void testSessionCreateTopic() throws Exception
     {
         Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
-        
+
         // Using the BURL method
         Topic topic = ssn.createTopic("ACME");
-        MessageProducer prod = ssn.createProducer(topic); 
+        MessageProducer prod = ssn.createProducer(topic);
         MessageConsumer cons = ssn.createConsumer(topic);
-        
+
         prod.send(ssn.createTextMessage("test"));
         assertNotNull("consumer should receive a message",cons.receive(1000));
         cons.close();
-     
+
         // Using the ADDR method
         topic = ssn.createTopic("ADDR:ACME");
-        prod = ssn.createProducer(topic); 
+        prod = ssn.createProducer(topic);
         cons = ssn.createConsumer(topic);
-        
+
         prod.send(ssn.createTextMessage("test"));
         assertNotNull("consumer should receive a message",cons.receive(1000));
         cons.close();
-        
-        String addr = "ADDR:vehicles/bus; " + 
-        "{ " + 
-          "create: always, " +                        
-          "node: " + 
+
+        String addr = "ADDR:vehicles/bus; " +
+        "{ " +
+          "create: always, " +
+          "node: " +
           "{" +
                "type: topic, " +
                "x-declare: " +
-               "{ " + 
-                   "type:direct, " + 
+               "{ " +
+                   "type:direct, " +
                    "auto-delete: true, " +
-                   "arguments: {" +  
+                   "arguments: {" +
                        "'qpid.msg_sequence': 1, " +
-                       "'qpid.ive': 1" + 
+                       "'qpid.ive': 1" +
                    "}" +
                "}" +
           "}, " +
           "link: {name : my-topic, " +
               "x-bindings: [{exchange : 'vehicles', key : car}, " +
-                           "{exchange : 'vehicles', key : van}]" + 
-          "}" + 
+                           "{exchange : 'vehicles', key : van}]" +
+          "}" +
         "}";
-        
+
         // Using the ADDR method to create a more complicated topic
         topic = ssn.createTopic(addr);
-        prod = ssn.createProducer(topic); 
+        prod = ssn.createProducer(topic);
         cons = ssn.createConsumer(topic);
-        
+
         assertTrue("The queue was not bound to vehicle exchange using bus as the binding key",(
-                (AMQSession_0_10)ssn).isQueueBound("vehicles", 
+                (AMQSession_0_10)ssn).isQueueBound("vehicles",
                     "my-topic","bus", null));
-        
+
         assertTrue("The queue was not bound to vehicle exchange using car as the binding key",(
-                (AMQSession_0_10)ssn).isQueueBound("vehicles", 
+                (AMQSession_0_10)ssn).isQueueBound("vehicles",
                     "my-topic","car", null));
-        
+
         assertTrue("The queue was not bound to vehicle exchange using van as the binding key",(
-                (AMQSession_0_10)ssn).isQueueBound("vehicles", 
+                (AMQSession_0_10)ssn).isQueueBound("vehicles",
                     "my-topic","van", null));
-        
+
         Message msg = ssn.createTextMessage("test");
         msg.setStringProperty("qpid.subject", "van");
         prod.send(msg);
         assertNotNull("consumer should receive a message",cons.receive(1000));
         cons.close();
     }
-    
+
     /**
      * Test Goal : Verify the default subjects used for each exchange type.
      * The default for amq.topic is "#" and for the rest it's ""
@@ -632,92 +647,92 @@ public class AddressBasedDestinationTest
     public void testDefaultSubjects() throws Exception
     {
         Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
-        
+
         MessageConsumer queueCons = ssn.createConsumer(new AMQAnyDestination("ADDR:amq.direct"));
         MessageConsumer topicCons = ssn.createConsumer(new AMQAnyDestination("ADDR:amq.topic"));
-        
+
         MessageProducer queueProducer = ssn.createProducer(new AMQAnyDestination("ADDR:amq.direct"));
         MessageProducer topicProducer1 = ssn.createProducer(new AMQAnyDestination("ADDR:amq.topic/usa.weather"));
         MessageProducer topicProducer2 = ssn.createProducer(new AMQAnyDestination("ADDR:amq.topic/sales"));
-        
+
         queueProducer.send(ssn.createBytesMessage());
         assertNotNull("The consumer subscribed to amq.direct " +
         		"with empty binding key should have received the message ",queueCons.receive(1000));
-        
+
         topicProducer1.send(ssn.createTextMessage("25c"));
         assertEquals("The consumer subscribed to amq.topic " +
                 "with '#' binding key should have received the message ",
                 ((TextMessage)topicCons.receive(1000)).getText(),"25c");
-        
+
         topicProducer2.send(ssn.createTextMessage("1000"));
         assertEquals("The consumer subscribed to amq.topic " +
                 "with '#' binding key should have received the message ",
                 ((TextMessage)topicCons.receive(1000)).getText(),"1000");
     }
-    
+
     /**
      * Test Goal : Verify that 'mode : browse' works as expected using a regular consumer.
      *             This indirectly tests ring queues as well.
      */
     public void testBrowseMode() throws Exception
     {
-        
+
         Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
-        
+
         String addr = "ADDR:my-ring-queue; {create: always, mode: browse, " +
             "node: {x-bindings: [{exchange : 'amq.direct', key : test}], " +
                    "x-declare:{arguments : {'qpid.policy_type':ring, 'qpid.max_count':2}}}}";
-        
+
         Destination dest = ssn.createQueue(addr);
         MessageConsumer browseCons = ssn.createConsumer(dest);
         MessageProducer prod = ssn.createProducer(ssn.createQueue("ADDR:amq.direct/test"));
-        
+
         prod.send(ssn.createTextMessage("Test1"));
         prod.send(ssn.createTextMessage("Test2"));
-        
+
         TextMessage msg = (TextMessage)browseCons.receive(1000);
         assertEquals("Didn't receive the first message",msg.getText(),"Test1");
-        
+
         msg = (TextMessage)browseCons.receive(1000);
         assertEquals("Didn't receive the first message",msg.getText(),"Test2");
-        
-        browseCons.close();                
+
+        browseCons.close();
         prod.send(ssn.createTextMessage("Test3"));
         browseCons = ssn.createConsumer(dest);
-        
+
         msg = (TextMessage)browseCons.receive(1000);
         assertEquals("Should receive the second message again",msg.getText(),"Test2");
-     
+
         msg = (TextMessage)browseCons.receive(1000);
         assertEquals("Should receive the third message since it's a ring queue",msg.getText(),"Test3");
-        
+
         assertNull("Should not receive anymore messages",browseCons.receive(500));
     }
-    
+
     /**
      * Test Goal : When the same destination is used when creating two consumers,
-     *             If the type == topic, verify that unique subscription queues are created, 
+     *             If the type == topic, verify that unique subscription queues are created,
      *             unless subscription queue has a name.
-     *             
+     *
      *             If the type == queue, same queue should be shared.
      */
     public void testSubscriptionForSameDestination() throws Exception
     {
-        Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);        
+        Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
         Destination dest = ssn.createTopic("ADDR:amq.topic/foo");
         MessageConsumer consumer1 = ssn.createConsumer(dest);
         MessageConsumer consumer2 = ssn.createConsumer(dest);
         MessageProducer prod = ssn.createProducer(dest);
-        
+
         prod.send(ssn.createTextMessage("A"));
         TextMessage m = (TextMessage)consumer1.receive(1000);
         assertEquals("Consumer1 should recieve message A",m.getText(),"A");
         m = (TextMessage)consumer2.receive(1000);
         assertEquals("Consumer2 should recieve message A",m.getText(),"A");
-        
+
         consumer1.close();
         consumer2.close();
-        
+
         dest = ssn.createTopic("ADDR:amq.topic/foo; { link: {name: my-queue}}");
         consumer1 = ssn.createConsumer(dest);
         try
@@ -726,60 +741,60 @@ public class AddressBasedDestinationTest
             fail("An exception should be thrown as 'my-queue' already have an exclusive subscriber");
         }
         catch(Exception e)
-        {            
+        {
         }
         _connection.close();
-        
+
         _connection = getConnection() ;
         _connection.start();
-        ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);        
+        ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
         dest = ssn.createTopic("ADDR:my_queue; {create: always}");
         consumer1 = ssn.createConsumer(dest);
         consumer2 = ssn.createConsumer(dest);
         prod = ssn.createProducer(dest);
-        
+
         prod.send(ssn.createTextMessage("A"));
-        Message m1 = consumer1.receive(1000); 
+        Message m1 = consumer1.receive(1000);
         Message m2 = consumer2.receive(1000);
-        
+
         if (m1 != null)
         {
-            assertNull("Only one consumer should receive the message",m2);  
+            assertNull("Only one consumer should receive the message",m2);
         }
         else
         {
-            assertNotNull("Only one consumer should receive the message",m2);  
+            assertNotNull("Only one consumer should receive the message",m2);
         }
     }
- 
+
     public void testXBindingsWithoutExchangeName() throws Exception
     {
         Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
         String addr = "ADDR:MRKT; " +
         		"{" +
-        		    "create: receiver," + 
+        		    "create: receiver," +
         		    "node : {type: topic, x-declare: {type: topic} },"  +
         		    "link:{" +
         		         "name: my-topic," +
         		         "x-bindings:[{key:'NYSE.#'},{key:'NASDAQ.#'},{key:'CNTL.#'}]" +
         		         "}" +
         		"}";
-        
+
         // Using the ADDR method to create a more complicated topic
         MessageConsumer  cons = ssn.createConsumer(new AMQAnyDestination(addr));
-        
+
         assertTrue("The queue was not bound to MRKT exchange using NYSE.# as the binding key",(
-                (AMQSession_0_10)ssn).isQueueBound("MRKT", 
+                (AMQSession_0_10)ssn).isQueueBound("MRKT",
                     "my-topic","NYSE.#", null));
-        
+
         assertTrue("The queue was not bound to MRKT exchange using NASDAQ.# as the binding key",(
-                (AMQSession_0_10)ssn).isQueueBound("MRKT", 
+                (AMQSession_0_10)ssn).isQueueBound("MRKT",
                     "my-topic","NASDAQ.#", null));
-        
+
         assertTrue("The queue was not bound to MRKT exchange using CNTL.# as the binding key",(
-                (AMQSession_0_10)ssn).isQueueBound("MRKT", 
+                (AMQSession_0_10)ssn).isQueueBound("MRKT",
                     "my-topic","CNTL.#", null));
-        
+
         MessageProducer prod = ssn.createProducer(ssn.createTopic(addr));
         Message msg = ssn.createTextMessage("test");
         msg.setStringProperty("qpid.subject", "NASDAQ.ABCD");
@@ -787,7 +802,7 @@ public class AddressBasedDestinationTest
         assertNotNull("consumer should receive a message",cons.receive(1000));
         cons.close();
     }
-    
+
     public void testXSubscribeOverrides() throws Exception
     {
         Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
@@ -800,57 +815,57 @@ public class AddressBasedDestinationTest
             fail("An exception should be thrown as 'my-queue' already have an exclusive subscriber");
         }
         catch(Exception e)
-        {            
+        {
         }
     }
-    
+
     public void testQueueReceiversAndTopicSubscriber() throws Exception
     {
         Queue queue = new AMQAnyDestination("ADDR:my-queue; {create: always}");
         Topic topic = new AMQAnyDestination("ADDR:amq.topic/test");
-        
+
         QueueSession qSession = ((AMQConnection)_connection).createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
         QueueReceiver receiver = qSession.createReceiver(queue);
-        
+
         TopicSession tSession = ((AMQConnection)_connection).createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
         TopicSubscriber sub = tSession.createSubscriber(topic);
-        
+
         Session ssn = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         MessageProducer prod1 = ssn.createProducer(ssn.createQueue("ADDR:my-queue"));
         prod1.send(ssn.createTextMessage("test1"));
-        
+
         MessageProducer prod2 = ssn.createProducer(ssn.createTopic("ADDR:amq.topic/test"));
         prod2.send(ssn.createTextMessage("test2"));
-        
+
         Message msg1 = receiver.receive();
         assertNotNull(msg1);
         assertEquals("test1",((TextMessage)msg1).getText());
-        
+
         Message msg2 = sub.receive();
         assertNotNull(msg2);
-        assertEquals("test2",((TextMessage)msg2).getText());  
+        assertEquals("test2",((TextMessage)msg2).getText());
     }
-    
+
     public void testDurableSubscriber() throws Exception
     {
-        Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);        
+        Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
         Topic topic = ssn.createTopic("news.us");
-        
+
         MessageConsumer cons = ssn.createDurableSubscriber(topic, "my-sub");
         MessageProducer prod = ssn.createProducer(topic);
-        
+
         Message m = ssn.createTextMessage("A");
         prod.send(m);
         Message msg = cons.receive(1000);
         assertNotNull(msg);
         assertEquals("A",((TextMessage)msg).getText());
     }
-    
+
     public void testDeleteOptions() throws Exception
     {
         Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
         MessageConsumer cons;
-        
+
         // default (create never, assert never) -------------------
         // create never --------------------------------------------
         String addr1 = "ADDR:testQueue1;{create: always, delete: always}";
@@ -864,11 +879,11 @@ public class AddressBasedDestinationTest
         {
             fail("Exception should not be thrown. Exception thrown is : " + e);
         }
-        
+
         assertFalse("Queue not deleted as expected",(
-                (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));  
-        
-        
+                (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
+
+
         String addr2 = "ADDR:testQueue2;{create: always, delete: receiver}";
         dest = new AMQAnyDestination(addr2);
         try
@@ -880,11 +895,11 @@ public class AddressBasedDestinationTest
         {
             fail("Exception should not be thrown. Exception thrown is : " + e);
         }
-        
+
         assertFalse("Queue not deleted as expected",(
-                (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));  
+                (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
+
 
-        
         String addr3 = "ADDR:testQueue3;{create: always, delete: sender}";
         dest = new AMQAnyDestination(addr3);
         try
@@ -897,45 +912,45 @@ public class AddressBasedDestinationTest
         {
             fail("Exception should not be thrown. Exception thrown is : " + e);
         }
-        
+
         assertFalse("Queue not deleted as expected",(
-                (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));  
+                (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
+
 
-        
     }
-    
+
     /**
      * Test Goals : 1. Test if the client sets the correct accept mode for unreliable
      *                and at-least-once.
      *             2. Test default reliability modes for Queues and Topics.
      *             3. Test if an exception is thrown if exactly-once is used.
      *             4. Test if an exception is thrown if at-least-once is used with topics.
-     * 
+     *
      * Test Strategy: For goal #1 & #2
      *                For unreliable and at-least-once the test tries to receives messages
      *                in client_ack mode but does not ack the messages.
      *                It will then close the session, recreate a new session
      *                and will then try to verify the queue depth.
      *                For unreliable the messages should have been taken off the queue.
-     *                For at-least-once the messages should be put back onto the queue.    
-     * 
+     *                For at-least-once the messages should be put back onto the queue.
+     *
      */
-   
+
     public void testReliabilityOptions() throws Exception
     {
         String addr1 = "ADDR:testQueue1;{create: always, delete : receiver, link : {reliability : unreliable}}";
         acceptModeTest(addr1,0);
-        
+
         String addr2 = "ADDR:testQueue2;{create: always, delete : receiver, link : {reliability : at-least-once}}";
         acceptModeTest(addr2,2);
-        
+
         // Default accept-mode for topics
-        acceptModeTest("ADDR:amq.topic/test",0);        
-        
+        acceptModeTest("ADDR:amq.topic/test",0);
+
         // Default accept-mode for queues
         acceptModeTest("ADDR:testQueue1;{create: always}",2);
-               
-        String addr3 = "ADDR:testQueue2;{create: always, delete : receiver, link : {reliability : exactly-once}}";        
+
+        String addr3 = "ADDR:testQueue2;{create: always, delete : receiver, link : {reliability : exactly-once}}";
         try
         {
             AMQAnyDestination dest = new AMQAnyDestination(addr3);
@@ -945,8 +960,8 @@ public class AddressBasedDestinationTest
         {
             assertTrue(e.getCause().getMessage().contains("The reliability mode 'exactly-once' is not yet supported"));
         }
-        
-        String addr4 = "ADDR:amq.topic/test;{link : {reliability : at-least-once}}";        
+
+        String addr4 = "ADDR:amq.topic/test;{link : {reliability : at-least-once}}";
         try
         {
             AMQAnyDestination dest = new AMQAnyDestination(addr4);
@@ -959,34 +974,50 @@ public class AddressBasedDestinationTest
             assertTrue(e.getCause().getMessage().contains("AT-LEAST-ONCE is not yet supported for Topics"));
         }
     }
-    
+
     private void acceptModeTest(String address, int expectedQueueDepth) throws Exception
     {
         Session ssn = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
         MessageConsumer cons;
         MessageProducer prod;
-        
+
         AMQDestination  dest = new AMQAnyDestination(address);
         cons = ssn.createConsumer(dest);
         prod = ssn.createProducer(dest);
-        
+
         for (int i=0; i < expectedQueueDepth; i++)
         {
             prod.send(ssn.createTextMessage("Msg" + i));
         }
-        
+
         for (int i=0; i < expectedQueueDepth; i++)
         {
             Message msg = cons.receive(1000);
             assertNotNull(msg);
             assertEquals("Msg" + i,((TextMessage)msg).getText());
         }
-        
+
         ssn.close();
         ssn = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
-        long queueDepth = ((AMQSession) ssn).getQueueDepth(dest);        
-        assertEquals(expectedQueueDepth,queueDepth);        
+        long queueDepth = ((AMQSession) ssn).getQueueDepth(dest);
+        assertEquals(expectedQueueDepth,queueDepth);
+        cons.close();
+        prod.close();
+    }
+
+    public void testDestinationOnSend() throws Exception
+    {
+    	Session ssn = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
+        MessageConsumer cons = ssn.createConsumer(ssn.createTopic("amq.topic/test"));
+        MessageProducer prod = ssn.createProducer(null);
+
+        Queue queue = ssn.createQueue("amq.topic/test");
+        prod.send(queue,ssn.createTextMessage("A"));
+
+        Message msg = cons.receive(1000);
+        assertNotNull(msg);
+        assertEquals("A",((TextMessage)msg).getText());
+        prod.close();
         cons.close();
-        prod.close();        
     }
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message