qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kgiu...@apache.org
Subject svn commit: r1172657 [5/21] - in /qpid/branches/qpid-3346/qpid: ./ cpp/ cpp/bindings/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qpid/dotnet/ cpp/bindings/qpid/dotnet/examples/csharp.direct.receiver/Properties/ cpp/bindings/qpid/dotnet/examples/cshar...
Date Mon, 19 Sep 2011 15:13:38 GMT
Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp Mon Sep 19 15:13:18 2011
@@ -25,21 +25,22 @@
 
 #include "qpid/Plugin.h"
 #include "qpid/sys/Socket.h"
+#include "qpid/sys/SocketAddress.h"
 #include "qpid/sys/Poller.h"
 #include "qpid/broker/Broker.h"
 #include "qpid/log/Statement.h"
 
 #include <boost/bind.hpp>
-#include <memory>
+#include <boost/ptr_container/ptr_vector.hpp>
 
 namespace qpid {
 namespace sys {
 
 class AsynchIOProtocolFactory : public ProtocolFactory {
     const bool tcpNoDelay;
-    Socket listener;
-    const uint16_t listeningPort;
-    std::auto_ptr<AsynchAcceptor> acceptor;
+    boost::ptr_vector<Socket> listeners;
+    boost::ptr_vector<AsynchAcceptor> acceptors;
+    uint16_t listeningPort;
 
   public:
     AsynchIOProtocolFactory(const std::string& host, const std::string& port, int backlog, bool nodelay);
@@ -71,15 +72,38 @@ static class TCPIOPlugin : public Plugin
                     "", boost::lexical_cast<std::string>(opts.port),
                     opts.connectionBacklog,
                     opts.tcpNoDelay));
-            QPID_LOG(notice, "Listening on TCP port " << protocolt->getPort());
+            QPID_LOG(notice, "Listening on TCP/TCP6 port " << protocolt->getPort());
             broker->registerProtocolFactory("tcp", protocolt);
         }
     }
 } tcpPlugin;
 
 AsynchIOProtocolFactory::AsynchIOProtocolFactory(const std::string& host, const std::string& port, int backlog, bool nodelay) :
-    tcpNoDelay(nodelay), listeningPort(listener.listen(host, port, backlog))
-{}
+    tcpNoDelay(nodelay)
+{
+    SocketAddress sa(host, port);
+
+    // We must have at least one resolved address
+    QPID_LOG(info, "Listening to: " << sa.asString())
+    Socket* s = new Socket;
+    uint16_t lport = s->listen(sa, backlog);
+    QPID_LOG(debug, "Listened to: " << lport);
+    listeners.push_back(s);
+
+    listeningPort = lport;
+
+    // Try any other resolved addresses
+    while (sa.nextAddress()) {
+        // Hack to ensure that all listening connections are on the same port
+        sa.setAddrInfoPort(listeningPort);
+        QPID_LOG(info, "Listening to: " << sa.asString())
+        Socket* s = new Socket;
+        uint16_t lport = s->listen(sa, backlog);
+        QPID_LOG(debug, "Listened to: " << lport);
+        listeners.push_back(s);
+    }
+
+}
 
 void AsynchIOProtocolFactory::established(Poller::shared_ptr poller, const Socket& s,
                                           ConnectionCodec::Factory* f, bool isClient) {
@@ -111,10 +135,12 @@ uint16_t AsynchIOProtocolFactory::getPor
 
 void AsynchIOProtocolFactory::accept(Poller::shared_ptr poller,
                                      ConnectionCodec::Factory* fact) {
-    acceptor.reset(
-        AsynchAcceptor::create(listener,
-                           boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false)));
-    acceptor->start(poller);
+    for (unsigned i = 0; i<listeners.size(); ++i) {
+        acceptors.push_back(
+            AsynchAcceptor::create(listeners[i],
+                            boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false)));
+        acceptors[i].start(poller);
+    }
 }
 
 void AsynchIOProtocolFactory::connectFailed(
@@ -138,6 +164,7 @@ void AsynchIOProtocolFactory::connect(
     // shutdown.  The allocated AsynchConnector frees itself when it
     // is no longer needed.
     Socket* socket = new Socket();
+    try {
     AsynchConnector* c = AsynchConnector::create(
         *socket,
         host,
@@ -147,6 +174,12 @@ void AsynchIOProtocolFactory::connect(
         boost::bind(&AsynchIOProtocolFactory::connectFailed,
                     this, _1, _2, _3, failed));
     c->start(poller);
+    } catch (std::exception&) {
+        // TODO: Design question - should we do the error callback and also throw?
+        int errCode = socket->getError();
+        connectFailed(*socket, errCode, strError(errCode), failed);
+        throw;
+    }
 }
 
 }} // namespace qpid::sys

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/TimerWarnings.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/TimerWarnings.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/TimerWarnings.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/TimerWarnings.cpp Mon Sep 19 15:13:18 2011
@@ -56,18 +56,18 @@ void TimerWarnings::log() {
             std::string task = i->first;
             TaskStats& stats = i->second;
             if (stats.lateDelay.count)
-                QPID_LOG(warning, task << " task late "
+                QPID_LOG(info, task << " task late "
                          << stats.lateDelay.count << " times by "
                          << stats.lateDelay.average()/TIME_MSEC << "ms on average.");
 
             if (stats.overranOverrun.count)
-                QPID_LOG(warning, task << " task overran "
+                QPID_LOG(info, task << " task overran "
                          << stats.overranOverrun.count << " times by "
                          << stats.overranOverrun.average()/TIME_MSEC << "ms (taking "
                          << stats.overranTime.average() << "ns) on average.");
 
             if (stats.lateAndOverranOverrun.count)
-                QPID_LOG(warning, task << " task late and overran "
+                QPID_LOG(info, task << " task late and overran "
                          << stats.lateAndOverranOverrun.count << " times: late "
                          << stats.lateAndOverranDelay.average()/TIME_MSEC << "ms, overran "
                          << stats.lateAndOverranOverrun.average()/TIME_MSEC << "ms (taking "

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp Mon Sep 19 15:13:18 2011
@@ -57,6 +57,7 @@ size_t CyrusSecurityLayer::decode(const 
             copied += count;
             decodeBuffer.position += count;
             size_t decodedSize = codec->decode(decodeBuffer.data, decodeBuffer.position);
+            if (decodedSize == 0) break;
             if (decodedSize < decodeBuffer.position) {
                 ::memmove(decodeBuffer.data, decodeBuffer.data + decodedSize, decodeBuffer.position - decodedSize);
             }

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp Mon Sep 19 15:13:18 2011
@@ -384,7 +384,12 @@ void PollerPrivate::resetMode(PollerHand
         epe.data.u64 = 0; // Keep valgrind happy
         epe.data.ptr = &eh;
 
-        QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_MOD, eh.fd(), &epe));
+        int rc = ::epoll_ctl(epollFd, EPOLL_CTL_MOD, eh.fd(), &epe);
+        // If something has closed the fd in the meantime try adding it back
+        if (rc ==-1 && errno == ENOENT) {
+            rc = ::epoll_ctl(epollFd, EPOLL_CTL_ADD, eh.fd(), &epe);
+        }
+        QPID_POSIX_CHECK(rc);
 
         eh.setActive();
         return;

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp Mon Sep 19 15:13:18 2011
@@ -149,6 +149,7 @@ private:
     ConnectedCallback connCallback;
     FailedCallback failCallback;
     const Socket& socket;
+    SocketAddress sa;
 
 public:
     AsynchConnector(const Socket& socket,
@@ -171,11 +172,13 @@ AsynchConnector::AsynchConnector(const S
                    boost::bind(&AsynchConnector::connComplete, this, _1)),
     connCallback(connCb),
     failCallback(failCb),
-    socket(s)
+    socket(s),
+    sa(hostname, port)
 {
     socket.setNonblocking();
-    SocketAddress sa(hostname, port);
+
     // Note, not catching any exceptions here, also has effect of destructing
+    QPID_LOG(info, "Connecting: " << sa.asString());
     socket.connect(sa);
 }
 
@@ -191,11 +194,26 @@ void AsynchConnector::stop()
 
 void AsynchConnector::connComplete(DispatchHandle& h)
 {
-    h.stopWatch();
     int errCode = socket.getError();
     if (errCode == 0) {
+        h.stopWatch();
         connCallback(socket);
     } else {
+        // Retry while we cause an immediate exception
+        // (asynch failure will be handled by re-entering here at the top)
+        while (sa.nextAddress()) {
+            try {
+                // Try next address without deleting ourselves
+                QPID_LOG(debug, "Ignored socket connect error: " << strError(errCode));
+                QPID_LOG(info, "Retrying connect: " << sa.asString());
+                socket.connect(sa);
+                return;
+            } catch (const std::exception& e) {
+                QPID_LOG(debug, "Ignored socket connect exception: " << e.what());
+            }
+            errCode = socket.getError();
+        }
+        h.stopWatch();
         failCallback(socket, errCode, strError(errCode));
     }
     DispatchHandle::doDelete();

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/posix/Socket.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/posix/Socket.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/posix/Socket.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/posix/Socket.cpp Mon Sep 19 15:13:18 2011
@@ -34,9 +34,6 @@
 #include <netdb.h>
 #include <cstdlib>
 #include <string.h>
-#include <iostream>
-
-#include <boost/format.hpp>
 
 namespace qpid {
 namespace sys {
@@ -44,24 +41,28 @@ namespace sys {
 namespace {
 std::string getName(int fd, bool local)
 {
-    ::sockaddr_storage name; // big enough for any socket address
-    ::socklen_t namelen = sizeof(name);
+    ::sockaddr_storage name_s; // big enough for any socket address
+    ::sockaddr* name = (::sockaddr*)&name_s;
+    ::socklen_t namelen = sizeof(name_s);
 
-    int result = -1;
     if (local) {
-        result = ::getsockname(fd, (::sockaddr*)&name, &namelen);
+        QPID_POSIX_CHECK( ::getsockname(fd, name, &namelen) );
     } else {
-        result = ::getpeername(fd, (::sockaddr*)&name, &namelen);
+        QPID_POSIX_CHECK( ::getpeername(fd, name, &namelen) );
     }
-    QPID_POSIX_CHECK(result);
 
-    char servName[NI_MAXSERV];
-    char dispName[NI_MAXHOST];
-    if (int rc=::getnameinfo((::sockaddr*)&name, namelen, dispName, sizeof(dispName),
-                                servName, sizeof(servName),
-                                NI_NUMERICHOST | NI_NUMERICSERV) != 0)
-        throw QPID_POSIX_ERROR(rc);
-    return std::string(dispName) + ":" + std::string(servName);
+    return SocketAddress::asString(name, namelen);
+}
+
+uint16_t getLocalPort(int fd)
+{
+    ::sockaddr_storage name_s; // big enough for any socket address
+    ::sockaddr* name = (::sockaddr*)&name_s;
+    ::socklen_t namelen = sizeof(name_s);
+
+    QPID_POSIX_CHECK( ::getsockname(fd, name, &namelen) );
+
+    return SocketAddress::getPort(name);
 }
 }
 
@@ -88,6 +89,11 @@ void Socket::createSocket(const SocketAd
     try {
         if (nonblocking) setNonblocking();
         if (nodelay) setTcpNoDelay();
+        if (getAddrInfo(sa).ai_family == AF_INET6) {
+            int flag = 1;
+            int result = ::setsockopt(socket, IPPROTO_IPV6, IPV6_V6ONLY, (char *)&flag, sizeof(flag));
+            QPID_POSIX_CHECK(result);
+        }
     } catch (std::exception&) {
         ::close(s);
         socket = -1;
@@ -95,6 +101,20 @@ void Socket::createSocket(const SocketAd
     }
 }
 
+Socket* Socket::createSameTypeSocket() const {
+    int& socket = impl->fd;
+    // Socket currently has no actual socket attached
+    if (socket == -1)
+        return new Socket;
+
+    ::sockaddr_storage sa;
+    ::socklen_t salen = sizeof(sa);
+    QPID_POSIX_CHECK(::getsockname(socket, (::sockaddr*)&sa, &salen));
+    int s = ::socket(sa.ss_family, SOCK_STREAM, 0); // Currently only work with SOCK_STREAM
+    if (s < 0) throw QPID_POSIX_ERROR(errno);
+    return new Socket(new IOHandlePrivate(s));
+}
+
 void Socket::setNonblocking() const {
     int& socket = impl->fd;
     nonblocking = true;
@@ -109,7 +129,7 @@ void Socket::setTcpNoDelay() const
     nodelay = true;
     if (socket != -1) {
         int flag = 1;
-        int result = setsockopt(impl->fd, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag));
+        int result = ::setsockopt(impl->fd, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag));
         QPID_POSIX_CHECK(result);
     }
 }
@@ -179,19 +199,14 @@ int Socket::listen(const SocketAddress& 
 
     const int& socket = impl->fd;
     int yes=1;
-    QPID_POSIX_CHECK(setsockopt(socket,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(yes)));
+    QPID_POSIX_CHECK(::setsockopt(socket,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(yes)));
 
     if (::bind(socket, getAddrInfo(sa).ai_addr, getAddrInfo(sa).ai_addrlen) < 0)
         throw Exception(QPID_MSG("Can't bind to port " << sa.asString() << ": " << strError(errno)));
     if (::listen(socket, backlog) < 0)
         throw Exception(QPID_MSG("Can't listen on port " << sa.asString() << ": " << strError(errno)));
 
-    struct sockaddr_in name;
-    socklen_t namelen = sizeof(name);
-    if (::getsockname(socket, (struct sockaddr*)&name, &namelen) < 0)
-        throw QPID_POSIX_ERROR(errno);
-
-    return ntohs(name.sin_port);
+    return getLocalPort(socket);
 }
 
 Socket* Socket::accept() const

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/posix/SocketAddress.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/posix/SocketAddress.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/posix/SocketAddress.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/posix/SocketAddress.cpp Mon Sep 19 15:13:18 2011
@@ -21,13 +21,13 @@
 
 #include "qpid/sys/SocketAddress.h"
 
-#include "qpid/sys/posix/check.h"
+#include "qpid/Exception.h"
+#include "qpid/Msg.h"
 
 #include <sys/socket.h>
-#include <string.h>
+#include <netinet/in.h>
 #include <netdb.h>
-
-#include <algorithm>
+#include <string.h>
 
 namespace qpid {
 namespace sys {
@@ -61,31 +61,70 @@ SocketAddress::~SocketAddress()
     }
 }
 
-std::string SocketAddress::asString(bool numeric) const
+std::string SocketAddress::asString(::sockaddr const * const addr, size_t addrlen)
 {
-    if (!numeric)
-        return host + ":" + port;
-    // Canonicalise into numeric id
-    const ::addrinfo& ai = getAddrInfo(*this);
     char servName[NI_MAXSERV];
     char dispName[NI_MAXHOST];
-    if (int rc=::getnameinfo(ai.ai_addr, ai.ai_addrlen,
-                            dispName, sizeof(dispName),
-                            servName, sizeof(servName),
-                            NI_NUMERICHOST | NI_NUMERICSERV) != 0)
-        throw QPID_POSIX_ERROR(rc);
-    std::string s(dispName);
+    if (int rc=::getnameinfo(addr, addrlen,
+        dispName, sizeof(dispName),
+                             servName, sizeof(servName),
+                             NI_NUMERICHOST | NI_NUMERICSERV) != 0)
+        throw qpid::Exception(QPID_MSG(gai_strerror(rc)));
+    std::string s;
+    switch (addr->sa_family) {
+        case AF_INET: s += dispName; break;
+        case AF_INET6: s += "["; s += dispName; s+= "]"; break;
+        default: throw Exception(QPID_MSG("Unexpected socket type"));
+    }
     s += ":";
     s += servName;
     return s;
 }
 
+uint16_t SocketAddress::getPort(::sockaddr const * const addr)
+{
+    switch (addr->sa_family) {
+        case AF_INET: return ntohs(((::sockaddr_in*)addr)->sin_port);
+        case AF_INET6: return ntohs(((::sockaddr_in6*)addr)->sin6_port);
+        default:throw Exception(QPID_MSG("Unexpected socket type"));
+    }
+}
+
+std::string SocketAddress::asString(bool numeric) const
+{
+    if (!numeric)
+        return host + ":" + port;
+    // Canonicalise into numeric id
+    const ::addrinfo& ai = getAddrInfo(*this);
+
+    return asString(ai.ai_addr, ai.ai_addrlen);
+}
+
+bool SocketAddress::nextAddress() {
+    bool r = currentAddrInfo->ai_next != 0;
+    if (r)
+        currentAddrInfo = currentAddrInfo->ai_next;
+    return r;
+}
+
+void SocketAddress::setAddrInfoPort(uint16_t port) {
+    if (!currentAddrInfo) return;
+
+    ::addrinfo& ai = *currentAddrInfo;
+    switch (ai.ai_family) {
+    case AF_INET: ((::sockaddr_in*)ai.ai_addr)->sin_port = htons(port); return;
+    case AF_INET6:((::sockaddr_in6*)ai.ai_addr)->sin6_port = htons(port); return;
+    default: throw Exception(QPID_MSG("Unexpected socket type"));
+    }
+}
+
 const ::addrinfo& getAddrInfo(const SocketAddress& sa)
 {
     if (!sa.addrInfo) {
         ::addrinfo hints;
         ::memset(&hints, 0, sizeof(hints));
-        hints.ai_family = AF_INET; // Change this to support IPv6
+        hints.ai_flags = AI_ADDRCONFIG; // Only use protocols that we have configured interfaces for
+        hints.ai_family = AF_UNSPEC; // Allow both IPv4 and IPv6
         hints.ai_socktype = SOCK_STREAM;
 
         const char* node = 0;
@@ -99,9 +138,10 @@ const ::addrinfo& getAddrInfo(const Sock
         int n = ::getaddrinfo(node, service, &hints, &sa.addrInfo);
         if (n != 0)
             throw Exception(QPID_MSG("Cannot resolve " << sa.asString(false) << ": " << ::gai_strerror(n)));
+        sa.currentAddrInfo = sa.addrInfo;
     }
 
-    return *sa.addrInfo;
+    return *sa.currentAddrInfo;
 }
 
 }}

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp Mon Sep 19 15:13:18 2011
@@ -47,16 +47,13 @@ namespace {
 
 /*
  * The function pointers for AcceptEx and ConnectEx need to be looked up
- * at run time. Make sure this is done only once.
+ * at run time.
  */
-boost::once_flag lookUpAcceptExOnce = BOOST_ONCE_INIT;
-LPFN_ACCEPTEX fnAcceptEx = 0;
-typedef void (*lookUpFunc)(const qpid::sys::Socket &);
-
-void lookUpAcceptEx() {
-    SOCKET h = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
+const LPFN_ACCEPTEX lookUpAcceptEx(const qpid::sys::Socket& s) {
+    SOCKET h = toSocketHandle(s);
     GUID guidAcceptEx = WSAID_ACCEPTEX;
     DWORD dwBytes = 0;
+    LPFN_ACCEPTEX fnAcceptEx;
     WSAIoctl(h,
              SIO_GET_EXTENSION_FUNCTION_POINTER,
              &guidAcceptEx,
@@ -66,9 +63,9 @@ void lookUpAcceptEx() {
              &dwBytes,
              NULL,
              NULL);
-    closesocket(h);
     if (fnAcceptEx == 0)
         throw qpid::Exception(QPID_MSG("Failed to look up AcceptEx"));
+    return fnAcceptEx;
 }
 
 }
@@ -95,18 +92,15 @@ private:
 
     AsynchAcceptor::Callback acceptedCallback;
     const Socket& socket;
+    const LPFN_ACCEPTEX fnAcceptEx;
 };
 
 AsynchAcceptor::AsynchAcceptor(const Socket& s, Callback callback)
   : acceptedCallback(callback),
-    socket(s) {
+    socket(s),
+    fnAcceptEx(lookUpAcceptEx(s)) {
 
     s.setNonblocking();
-#if (BOOST_VERSION >= 103500)   /* boost 1.35 or later reversed the args */
-    boost::call_once(lookUpAcceptExOnce, lookUpAcceptEx);
-#else
-    boost::call_once(lookUpAcceptEx, lookUpAcceptExOnce);
-#endif
 }
 
 AsynchAcceptor::~AsynchAcceptor()
@@ -124,25 +118,26 @@ void AsynchAcceptor::restart(void) {
     DWORD bytesReceived = 0;  // Not used, needed for AcceptEx API
     AsynchAcceptResult *result = new AsynchAcceptResult(acceptedCallback,
                                                         this,
-                                                        toSocketHandle(socket));
+                                                        socket);
     BOOL status;
-    status = ::fnAcceptEx(toSocketHandle(socket),
-                          toSocketHandle(*result->newSocket),
-                          result->addressBuffer,
-                          0,
-                          AsynchAcceptResult::SOCKADDRMAXLEN,
-                          AsynchAcceptResult::SOCKADDRMAXLEN,
-                          &bytesReceived,
-                          result->overlapped());
+    status = fnAcceptEx(toSocketHandle(socket),
+                        toSocketHandle(*result->newSocket),
+                        result->addressBuffer,
+                        0,
+                        AsynchAcceptResult::SOCKADDRMAXLEN,
+                        AsynchAcceptResult::SOCKADDRMAXLEN,
+                        &bytesReceived,
+                        result->overlapped());
     QPID_WINDOWS_CHECK_ASYNC_START(status);
 }
 
 
 AsynchAcceptResult::AsynchAcceptResult(AsynchAcceptor::Callback cb,
                                        AsynchAcceptor *acceptor,
-                                       SOCKET listener)
-  : callback(cb), acceptor(acceptor), listener(listener) {
-    newSocket.reset (new Socket());
+                                       const Socket& listener)
+  : callback(cb), acceptor(acceptor),
+    listener(toSocketHandle(listener)),
+    newSocket(listener.createSameTypeSocket()) {
 }
 
 void AsynchAcceptResult::success(size_t /*bytesTransferred*/) {

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/windows/AsynchIoResult.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/windows/AsynchIoResult.h?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/windows/AsynchIoResult.h (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/windows/AsynchIoResult.h Mon Sep 19 15:13:18 2011
@@ -83,17 +83,17 @@ class AsynchAcceptResult : public Asynch
 public:
     AsynchAcceptResult(qpid::sys::AsynchAcceptor::Callback cb,
                        AsynchAcceptor *acceptor,
-                       SOCKET listener);
+                       const qpid::sys::Socket& listener);
     virtual void success (size_t bytesTransferred);
     virtual void failure (int error);
 
 private:
     virtual void complete(void) {}  // No-op for this class.
 
-    std::auto_ptr<qpid::sys::Socket> newSocket;
     qpid::sys::AsynchAcceptor::Callback callback;
     AsynchAcceptor *acceptor;
     SOCKET listener;
+    std::auto_ptr<qpid::sys::Socket> newSocket;
 
     // AcceptEx needs a place to write the local and remote addresses
     // when accepting the connection. Place those here; get enough for

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/windows/Socket.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/windows/Socket.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/windows/Socket.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/windows/Socket.cpp Mon Sep 19 15:13:18 2011
@@ -19,25 +19,19 @@
  *
  */
 
-// Ensure we get all of winsock2.h
-#ifndef _WIN32_WINNT
-#define _WIN32_WINNT 0x0501
-#endif
-
 #include "qpid/sys/Socket.h"
+
 #include "qpid/sys/SocketAddress.h"
-#include "qpid/sys/windows/IoHandlePrivate.h"
 #include "qpid/sys/windows/check.h"
-#include "qpid/sys/Time.h"
+#include "qpid/sys/windows/IoHandlePrivate.h"
 
-#include <cstdlib>
-#include <string.h>
+// Ensure we get all of winsock2.h
+#ifndef _WIN32_WINNT
+#define _WIN32_WINNT 0x0501
+#endif
 
 #include <winsock2.h>
 
-#include <boost/format.hpp>
-#include <boost/lexical_cast.hpp>
-
 // Need to initialize WinSock. Ideally, this would be a singleton or embedded
 // in some one-time initialization function. I tried boost singleton and could
 // not get it to compile (and others located in google had the same problem).
@@ -91,22 +85,28 @@ namespace {
 
 std::string getName(SOCKET fd, bool local)
 {
-    sockaddr_in name; // big enough for any socket address    
-    socklen_t namelen = sizeof(name);
+    ::sockaddr_storage name_s; // big enough for any socket address
+    ::sockaddr* name = (::sockaddr*)&name_s;
+    ::socklen_t namelen = sizeof(name_s);
+
     if (local) {
-        QPID_WINSOCK_CHECK(::getsockname(fd, (sockaddr*)&name, &namelen));
+        QPID_WINSOCK_CHECK(::getsockname(fd, name, &namelen));
     } else {
-        QPID_WINSOCK_CHECK(::getpeername(fd, (sockaddr*)&name, &namelen));
+        QPID_WINSOCK_CHECK(::getpeername(fd, name, &namelen));
     }
 
-    char servName[NI_MAXSERV];
-    char dispName[NI_MAXHOST];
-    if (int rc = ::getnameinfo((sockaddr*)&name, namelen,
-                               dispName, sizeof(dispName), 
-                               servName, sizeof(servName), 
-                               NI_NUMERICHOST | NI_NUMERICSERV) != 0)
-        throw qpid::Exception(QPID_MSG(gai_strerror(rc)));
-    return std::string(dispName) + ":" + std::string(servName);
+    return SocketAddress::asString(name, namelen);
+}
+
+uint16_t getLocalPort(int fd)
+{
+    ::sockaddr_storage name_s; // big enough for any socket address
+    ::sockaddr* name = (::sockaddr*)&name_s;
+    ::socklen_t namelen = sizeof(name_s);
+
+    QPID_WINSOCK_CHECK(::getsockname(fd, name, &namelen));
+
+    return SocketAddress::getPort(name);
 }
 }  // namespace
 
@@ -114,13 +114,7 @@ Socket::Socket() :
     IOHandle(new IOHandlePrivate),
     nonblocking(false),
     nodelay(false)
-{
-    SOCKET& socket = impl->fd;
-    if (socket != INVALID_SOCKET) Socket::close();
-    SOCKET s = ::socket (PF_INET, SOCK_STREAM, 0);
-    if (s == INVALID_SOCKET) throw QPID_WINDOWS_ERROR(WSAGetLastError());
-    socket = s;
-}
+{}
 
 Socket::Socket(IOHandlePrivate* h) :
     IOHandle(h),
@@ -128,8 +122,7 @@ Socket::Socket(IOHandlePrivate* h) :
     nodelay(false)
 {}
 
-void
-Socket::createSocket(const SocketAddress& sa) const
+void Socket::createSocket(const SocketAddress& sa) const
 {
     SOCKET& socket = impl->fd;
     if (socket != INVALID_SOCKET) Socket::close();
@@ -144,12 +137,26 @@ Socket::createSocket(const SocketAddress
         if (nonblocking) setNonblocking();
         if (nodelay) setTcpNoDelay();
     } catch (std::exception&) {
-        closesocket(s);
+        ::closesocket(s);
         socket = INVALID_SOCKET;
         throw;
     }
 }
 
+Socket* Socket::createSameTypeSocket() const {
+    SOCKET& socket = impl->fd;
+    // Socket currently has no actual socket attached
+    if (socket == INVALID_SOCKET)
+        return new Socket;
+
+    ::sockaddr_storage sa;
+    ::socklen_t salen = sizeof(sa);
+    QPID_WINSOCK_CHECK(::getsockname(socket, (::sockaddr*)&sa, &salen));
+    SOCKET s = ::socket(sa.ss_family, SOCK_STREAM, 0); // Currently only work with SOCK_STREAM
+    if (s == INVALID_SOCKET) throw QPID_WINDOWS_ERROR(WSAGetLastError());
+    return new Socket(new IOHandlePrivate(s));
+}
+
 void Socket::setNonblocking() const {
     u_long nonblock = 1;
     QPID_WINSOCK_CHECK(ioctlsocket(impl->fd, FIONBIO, &nonblock));
@@ -166,21 +173,14 @@ Socket::connect(const SocketAddress& add
 {
     peername = addr.asString(false);
 
+    createSocket(addr);
+
     const SOCKET& socket = impl->fd;
-    const addrinfo *addrs = &(getAddrInfo(addr));
-    int error = 0;
+    int err;
     WSASetLastError(0);
-    while (addrs != 0) {
-        if ((::connect(socket, addrs->ai_addr, addrs->ai_addrlen) == 0) ||
-            (WSAGetLastError() == WSAEWOULDBLOCK))
-            break;
-        // Error... save this error code and see if there are other address
-        // to try before throwing the exception.
-        error = WSAGetLastError();
-        addrs = addrs->ai_next;
-    }
-    if (error)
-        throw qpid::Exception(QPID_MSG(strError(error) << ": " << peername));
+    if ((::connect(socket, getAddrInfo(addr).ai_addr, getAddrInfo(addr).ai_addrlen) != 0) &&
+        ((err = ::WSAGetLastError()) != WSAEWOULDBLOCK))
+        throw qpid::Exception(QPID_MSG(strError(err) << ": " << peername));
 }
 
 void
@@ -211,24 +211,26 @@ int Socket::read(void *buf, size_t count
     return received;
 }
 
-int Socket::listen(const std::string&, const std::string& port, int backlog) const
+int Socket::listen(const std::string& host, const std::string& port, int backlog) const
+{
+    SocketAddress sa(host, port);
+    return listen(sa, backlog);
+}
+
+int Socket::listen(const SocketAddress& addr, int backlog) const
 {
+    createSocket(addr);
+
     const SOCKET& socket = impl->fd;
     BOOL yes=1;
     QPID_WINSOCK_CHECK(setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, (char *)&yes, sizeof(yes)));
-    struct sockaddr_in name;
-    memset(&name, 0, sizeof(name));
-    name.sin_family = AF_INET;
-    name.sin_port = htons(boost::lexical_cast<uint16_t>(port));
-    name.sin_addr.s_addr = 0;
-    if (::bind(socket, (struct sockaddr*)&name, sizeof(name)) == SOCKET_ERROR)
-        throw Exception(QPID_MSG("Can't bind to port " << port << ": " << strError(WSAGetLastError())));
+
+    if (::bind(socket, getAddrInfo(addr).ai_addr, getAddrInfo(addr).ai_addrlen) == SOCKET_ERROR)
+        throw Exception(QPID_MSG("Can't bind to " << addr.asString() << ": " << strError(WSAGetLastError())));
     if (::listen(socket, backlog) == SOCKET_ERROR)
-        throw Exception(QPID_MSG("Can't listen on port " << port << ": " << strError(WSAGetLastError())));
-    
-    socklen_t namelen = sizeof(name);
-    QPID_WINSOCK_CHECK(::getsockname(socket, (struct sockaddr*)&name, &namelen));
-    return ntohs(name.sin_port);
+        throw Exception(QPID_MSG("Can't listen on " <<addr.asString() << ": " << strError(WSAGetLastError())));
+
+    return getLocalPort(socket);
 }
 
 Socket* Socket::accept() const
@@ -243,15 +245,17 @@ Socket* Socket::accept() const
 
 std::string Socket::getPeerAddress() const
 {
-    if (peername.empty())
+    if (peername.empty()) {
         peername = getName(impl->fd, false);
+    }
     return peername;
 }
 
 std::string Socket::getLocalAddress() const
 {
-    if (localname.empty())
+    if (localname.empty()) {
         localname = getName(impl->fd, true);
+    }
     return localname;
 }
 

Propchange: qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/windows/Socket.cpp
            ('svn:executable' removed)

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/windows/SocketAddress.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/windows/SocketAddress.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/windows/SocketAddress.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/windows/SocketAddress.cpp Mon Sep 19 15:13:18 2011
@@ -19,15 +19,16 @@
  *
  */
 
+#include "qpid/sys/SocketAddress.h"
+
+#include "qpid/Exception.h"
+#include "qpid/Msg.h"
+
 // Ensure we get all of winsock2.h
 #ifndef _WIN32_WINNT
 #define _WIN32_WINNT 0x0501
 #endif
 
-#include "qpid/sys/SocketAddress.h"
-
-#include "qpid/sys/windows/check.h"
-
 #include <winsock2.h>
 #include <ws2tcpip.h>
 #include <string.h>
@@ -40,37 +41,111 @@ SocketAddress::SocketAddress(const std::
     port(port0),
     addrInfo(0)
 {
-    ::addrinfo hints;
-    ::memset(&hints, 0, sizeof(hints));
-    hints.ai_family = AF_INET; // In order to allow AF_INET6 we'd have to change createTcp() as well
-    hints.ai_socktype = SOCK_STREAM;
-
-    const char* node = 0;
-    if (host.empty()) {
-        hints.ai_flags |= AI_PASSIVE;
-    } else {
-        node = host.c_str();
-    }
-    const char* service = port.empty() ? "0" : port.c_str();
+}
 
-    int n = ::getaddrinfo(node, service, &hints, &addrInfo);
-    if (n != 0)
-        throw Exception(QPID_MSG("Cannot resolve " << host << ": " << ::gai_strerror(n)));
+SocketAddress::SocketAddress(const SocketAddress& sa) :
+    host(sa.host),
+    port(sa.port),
+    addrInfo(0)
+{
+}
+
+SocketAddress& SocketAddress::operator=(const SocketAddress& sa)
+{
+    SocketAddress temp(sa);
+
+    std::swap(temp, *this);
+    return *this;
 }
 
 SocketAddress::~SocketAddress()
 {
-    ::freeaddrinfo(addrInfo);
+    if (addrInfo) {
+        ::freeaddrinfo(addrInfo);
+    }
+}
+
+std::string SocketAddress::asString(::sockaddr const * const addr, size_t addrlen)
+{
+    char servName[NI_MAXSERV];
+    char dispName[NI_MAXHOST];
+    if (int rc=::getnameinfo(addr, addrlen,
+        dispName, sizeof(dispName),
+                             servName, sizeof(servName),
+                             NI_NUMERICHOST | NI_NUMERICSERV) != 0)
+        throw qpid::Exception(QPID_MSG(gai_strerror(rc)));
+    std::string s;
+    switch (addr->sa_family) {
+        case AF_INET: s += dispName; break;
+        case AF_INET6: s += "["; s += dispName; s+= "]"; break;
+        default: throw Exception(QPID_MSG("Unexpected socket type"));
+    }
+    s += ":";
+    s += servName;
+    return s;
+}
+
+uint16_t SocketAddress::getPort(::sockaddr const * const addr)
+{
+    switch (addr->sa_family) {
+        case AF_INET: return ntohs(((::sockaddr_in*)addr)->sin_port);
+        case AF_INET6: return ntohs(((::sockaddr_in6*)addr)->sin6_port);
+        default:throw Exception(QPID_MSG("Unexpected socket type"));
+    }
 }
 
-std::string SocketAddress::asString(bool) const
+std::string SocketAddress::asString(bool numeric) const
 {
-    return host + ":" + port;
+    if (!numeric)
+        return host + ":" + port;
+    // Canonicalise into numeric id
+    const ::addrinfo& ai = getAddrInfo(*this);
+
+    return asString(ai.ai_addr, ai.ai_addrlen);
+}
+
+bool SocketAddress::nextAddress() {
+    bool r = currentAddrInfo->ai_next != 0;
+    if (r)
+        currentAddrInfo = currentAddrInfo->ai_next;
+    return r;
+}
+
+void SocketAddress::setAddrInfoPort(uint16_t port) {
+    if (!currentAddrInfo) return;
+
+    ::addrinfo& ai = *currentAddrInfo;
+    switch (ai.ai_family) {
+    case AF_INET: ((::sockaddr_in*)ai.ai_addr)->sin_port = htons(port); return;
+    case AF_INET6:((::sockaddr_in6*)ai.ai_addr)->sin6_port = htons(port); return;
+    default: throw Exception(QPID_MSG("Unexpected socket type"));
+    }
 }
 
 const ::addrinfo& getAddrInfo(const SocketAddress& sa)
 {
-    return *sa.addrInfo;
+    if (!sa.addrInfo) {
+        ::addrinfo hints;
+        ::memset(&hints, 0, sizeof(hints));
+        hints.ai_flags = AI_ADDRCONFIG; // Only use protocols that we have configured interfaces for
+        hints.ai_family = AF_UNSPEC; // Allow both IPv4 and IPv6
+        hints.ai_socktype = SOCK_STREAM;
+
+        const char* node = 0;
+        if (sa.host.empty()) {
+            hints.ai_flags |= AI_PASSIVE;
+        } else {
+            node = sa.host.c_str();
+        }
+        const char* service = sa.port.empty() ? "0" : sa.port.c_str();
+
+        int n = ::getaddrinfo(node, service, &hints, &sa.addrInfo);
+        if (n != 0)
+            throw Exception(QPID_MSG("Cannot resolve " << sa.asString(false) << ": " << ::gai_strerror(n)));
+        sa.currentAddrInfo = sa.addrInfo;
+    }
+
+    return *sa.currentAddrInfo;
 }
 
 }}

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.h?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.h (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.h Mon Sep 19 15:13:18 2011
@@ -39,9 +39,6 @@ namespace qpid {
 namespace sys {
 namespace windows {
     
-class Socket;
-class Poller;
-
 /*
  * SSL/Schannel shim between the frame-handling and AsynchIO layers.
  * SslAsynchIO creates a regular AsynchIO object to handle I/O and this class

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/windows/Thread.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/windows/Thread.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/windows/Thread.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/windows/Thread.cpp Mon Sep 19 15:13:18 2011
@@ -19,6 +19,11 @@
  *
  */
 
+// Ensure definition of OpenThread in mingw
+#ifndef _WIN32_WINNT
+#define _WIN32_WINNT 0x0501
+#endif
+
 #include "qpid/sys/Thread.h"
 #include "qpid/sys/Runnable.h"
 #include "qpid/sys/windows/check.h"
@@ -26,50 +31,204 @@
 #include <process.h>
 #include <windows.h>
 
-namespace {
-unsigned __stdcall runRunnable(void* p)
-{
-    static_cast<qpid::sys::Runnable*>(p)->run();
-    _endthreadex(0);
-    return 0;
-}
-}
+/*
+ * This implementation distinguishes between two types of thread: Qpid
+ * threads (based on qpid::sys::Runnable) and the rest.  It provides a
+ * join() that will not deadlock against the Windows loader lock for
+ * Qpid threads.
+ *
+ * System thread identifiers are unique per Windows thread; thread
+ * handles are not.  Thread identifiers can be recycled, but keeping a
+ * handle open against the thread prevents recycling as long as
+ * shared_ptr references to a ThreadPrivate structure remain.
+ *
+ * There is a 1-1 relationship between Qpid threads and their
+ * ThreadPrivate structure.  Non-Qpid threads do not need to find the
+ * qpidThreadDone handle, so there may be a 1-many relationship for
+ * them.
+ *
+ * TLS storage is used for a lockless solution for static library
+ * builds.  The special case of LoadLibrary/FreeLibrary requires
+ * additional synchronization variables and resource cleanup in
+ * DllMain.  _DLL marks the dynamic case.
+ */
 
 namespace qpid {
 namespace sys {
 
 class ThreadPrivate {
+public:
     friend class Thread;
+    friend unsigned __stdcall runThreadPrivate(void*);
+    typedef boost::shared_ptr<ThreadPrivate> shared_ptr;
+    ~ThreadPrivate();
 
-    HANDLE threadHandle;
+private:
     unsigned threadId;
-    
-    ThreadPrivate(Runnable* runnable) {
-        uintptr_t h =  _beginthreadex(0,
-                                      0,
-                                      runRunnable,
-                                      runnable,
-                                      0,
-                                      &threadId);
-        QPID_WINDOWS_CHECK_CRT_NZ(h);
-        threadHandle = reinterpret_cast<HANDLE>(h);
+    HANDLE threadHandle;
+    HANDLE initCompleted;
+    HANDLE qpidThreadDone;
+    Runnable* runnable;
+    shared_ptr keepAlive;
+
+    ThreadPrivate() : threadId(GetCurrentThreadId()), initCompleted(NULL),
+                      qpidThreadDone(NULL), runnable(NULL) {
+        threadHandle =  OpenThread (SYNCHRONIZE, FALSE, threadId);
+        QPID_WINDOWS_CHECK_CRT_NZ(threadHandle);
     }
-    
-    ThreadPrivate()
-      : threadHandle(GetCurrentThread()), threadId(GetCurrentThreadId()) {}
+
+    ThreadPrivate(Runnable* r) : threadHandle(NULL), initCompleted(NULL),
+                                 qpidThreadDone(NULL), runnable(r) {}
+
+    void start(shared_ptr& p);
+    static shared_ptr createThread(Runnable* r);
 };
 
+}}  // namespace qpid::sys 
+
+
+namespace {
+using namespace qpid::sys;
+
+#ifdef _DLL
+class ScopedCriticalSection
+{
+  public:
+    ScopedCriticalSection(CRITICAL_SECTION& cs) : criticalSection(cs) { EnterCriticalSection(&criticalSection); }
+    ~ScopedCriticalSection() { LeaveCriticalSection(&criticalSection); }
+  private:
+    CRITICAL_SECTION& criticalSection;
+};
+
+CRITICAL_SECTION threadLock;
+long runningThreads = 0;
+HANDLE threadsDone;
+bool terminating = false;
+#endif
+
+
+DWORD volatile tlsIndex = TLS_OUT_OF_INDEXES;
+
+DWORD getTlsIndex() {
+    if (tlsIndex != TLS_OUT_OF_INDEXES)
+        return tlsIndex;        // already set
+
+    DWORD trialIndex = TlsAlloc();
+    QPID_WINDOWS_CHECK_NOT(trialIndex, TLS_OUT_OF_INDEXES); // No OS resource
+    
+    // only one thread gets to set the value
+    DWORD actualIndex = (DWORD) InterlockedCompareExchange((LONG volatile *) &tlsIndex, (LONG) trialIndex, (LONG) TLS_OUT_OF_INDEXES);
+    if (actualIndex == TLS_OUT_OF_INDEXES)
+        return trialIndex;      // we won the race
+    else {
+        TlsFree(trialIndex);
+        return actualIndex;
+    }
+}
+
+} // namespace
+
+namespace qpid {
+namespace sys {
+
+unsigned __stdcall runThreadPrivate(void* p)
+{
+    ThreadPrivate* threadPrivate = static_cast<ThreadPrivate*>(p);
+    TlsSetValue(getTlsIndex(), threadPrivate);
+
+    WaitForSingleObject (threadPrivate->initCompleted, INFINITE);
+    CloseHandle (threadPrivate->initCompleted);
+    threadPrivate->initCompleted = NULL;
+
+    try {
+        threadPrivate->runnable->run();
+    } catch (...) {
+        // not our concern
+    }
+
+    SetEvent (threadPrivate->qpidThreadDone); // allow join()
+    threadPrivate->keepAlive.reset();         // may run ThreadPrivate destructor
+
+#ifdef _DLL
+    {
+        ScopedCriticalSection l(threadLock);
+        if (--runningThreads == 0)
+            SetEvent(threadsDone);
+    }
+#endif
+    return 0;
+}
+
+
+ThreadPrivate::shared_ptr ThreadPrivate::createThread(Runnable* runnable) {
+    ThreadPrivate::shared_ptr tp(new ThreadPrivate(runnable));
+    tp->start(tp);
+    return tp;
+}
+
+void ThreadPrivate::start(ThreadPrivate::shared_ptr& tp) {
+    getTlsIndex();              // fail here if OS problem, not in new thread
+
+    initCompleted = CreateEvent (NULL, TRUE, FALSE, NULL);
+    QPID_WINDOWS_CHECK_CRT_NZ(initCompleted);
+    qpidThreadDone = CreateEvent (NULL, TRUE, FALSE, NULL);
+    QPID_WINDOWS_CHECK_CRT_NZ(qpidThreadDone);
+
+#ifdef _DLL
+    {
+        ScopedCriticalSection l(threadLock);
+        if (terminating)
+            throw qpid::Exception(QPID_MSG("creating thread after exit/FreeLibrary"));
+        runningThreads++;
+    }
+#endif
+
+    uintptr_t h =  _beginthreadex(0,
+                                  0,
+                                  runThreadPrivate,
+                                  (void *)this,
+                                  0,
+                                  &threadId);
+
+#ifdef _DLL
+    if (h == NULL) {
+        ScopedCriticalSection l(threadLock);
+        if (--runningThreads == 0)
+            SetEvent(threadsDone);
+    }
+#endif
+
+    QPID_WINDOWS_CHECK_CRT_NZ(h);
+
+    // Success
+    keepAlive = tp;
+    threadHandle = reinterpret_cast<HANDLE>(h);
+    SetEvent (initCompleted);
+}
+
+ThreadPrivate::~ThreadPrivate() {
+    if (threadHandle)
+        CloseHandle (threadHandle);
+    if (initCompleted)
+        CloseHandle (initCompleted);
+    if (qpidThreadDone)
+        CloseHandle (qpidThreadDone);
+}
+
+
 Thread::Thread() {}
 
-Thread::Thread(Runnable* runnable) : impl(new ThreadPrivate(runnable)) {}
+Thread::Thread(Runnable* runnable) : impl(ThreadPrivate::createThread(runnable)) {}
 
-Thread::Thread(Runnable& runnable) : impl(new ThreadPrivate(&runnable)) {}
+Thread::Thread(Runnable& runnable) : impl(ThreadPrivate::createThread(&runnable)) {}
 
 Thread::operator bool() {
     return impl;
 }
 
 bool Thread::operator==(const Thread& t) const {
+    if (!impl || !t.impl)
+        return false;
     return impl->threadId == t.impl->threadId;
 }
 
@@ -79,10 +238,17 @@ bool Thread::operator!=(const Thread& t)
 
 void Thread::join() {
     if (impl) {
-        DWORD status = WaitForSingleObject (impl->threadHandle, INFINITE);
+        DWORD status;
+        if (impl->runnable) {
+            HANDLE handles[2] = {impl->qpidThreadDone, impl->threadHandle};
+            // wait for either.  threadHandle not signalled if loader
+            // lock held (FreeLibrary).  qpidThreadDone not signalled
+            // if thread terminated by exit().
+            status = WaitForMultipleObjects (2, handles, false, INFINITE);
+        }
+        else
+            status = WaitForSingleObject (impl->threadHandle, INFINITE);
         QPID_WINDOWS_CHECK_NOT(status, WAIT_FAILED);
-        CloseHandle (impl->threadHandle);
-        impl->threadHandle = 0;
     }
 }
 
@@ -92,9 +258,70 @@ unsigned long Thread::logId() {
 
 /* static */
 Thread Thread::current() {
+    ThreadPrivate* tlsValue = (ThreadPrivate *) TlsGetValue(getTlsIndex());
     Thread t;
-    t.impl.reset(new ThreadPrivate());
+    if (tlsValue != NULL) {
+        // called from within Runnable->run(), so keepAlive has positive use count
+        t.impl = tlsValue->keepAlive;
+    }
+    else
+        t.impl.reset(new ThreadPrivate());
     return t;
 }
 
-}}  /* qpid::sys */
+}}  // namespace qpid::sys
+
+
+#ifdef _DLL
+
+// DllMain: called possibly many times in a process lifetime if dll
+// loaded and freed repeatedly .  Be mindful of Windows loader lock
+// and other DllMain restrictions.
+
+BOOL APIENTRY DllMain(HMODULE hm, DWORD reason, LPVOID reserved) {
+    switch (reason) {
+    case DLL_PROCESS_ATTACH:
+        InitializeCriticalSection(&threadLock);
+        threadsDone = CreateEvent(NULL, TRUE, FALSE, NULL);
+        break;
+
+    case DLL_PROCESS_DETACH:
+        terminating = true;
+        if (reserved != NULL) {
+            // process exit(): threads are stopped arbitrarily and
+            // possibly in an inconsistent state.  Not even threadLock
+            // can be trusted.  All static destructors have been
+            // called at this point and any resources this unit knows
+            // about will be released as part of process tear down by
+            // the OS.  Accordingly, do nothing.
+            return TRUE;
+        }
+        else {
+            // FreeLibrary(): threads are still running and we are
+            // encouraged to clean up to avoid leaks.  Mostly we just
+            // want any straggler threads to finish and notify
+            // threadsDone as the last thing they do.
+            while (1) {
+                {
+                    ScopedCriticalSection l(threadLock);
+                    if (runningThreads == 0)
+                        break;
+                    ResetEvent(threadsDone);
+                }
+                WaitForSingleObject(threadsDone, INFINITE);
+            }
+            if (tlsIndex != TLS_OUT_OF_INDEXES)
+                TlsFree(getTlsIndex());
+            CloseHandle(threadsDone);
+            DeleteCriticalSection(&threadLock);
+        }
+        break;
+
+    case DLL_THREAD_ATTACH:
+    case DLL_THREAD_DETACH:
+        break;
+    }
+    return TRUE;
+}
+
+#endif

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/types/Variant.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/types/Variant.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/types/Variant.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/types/Variant.cpp Mon Sep 19 15:13:18 2011
@@ -19,7 +19,6 @@
  *
  */
 #include "qpid/types/Variant.h"
-#include "qpid/Msg.h"
 #include "qpid/log/Statement.h"
 #include <boost/format.hpp>
 #include <boost/lexical_cast.hpp>

Modified: qpid/branches/qpid-3346/qpid/cpp/src/tests/BrokerFixture.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/tests/BrokerFixture.h?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/tests/BrokerFixture.h (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/tests/BrokerFixture.h Mon Sep 19 15:13:18 2011
@@ -22,8 +22,6 @@
  *
  */
 
-#include "SocketProxy.h"
-
 #include "qpid/broker/Broker.h"
 #include "qpid/client/Connection.h"
 #include "qpid/client/ConnectionImpl.h"
@@ -71,16 +69,15 @@ struct  BrokerFixture : private boost::n
         brokerThread = qpid::sys::Thread(*broker);
     };
 
-    void shutdownBroker()
-    {
-        broker->shutdown();
-        broker = BrokerPtr();
+    void shutdownBroker() {
+        if (broker) {
+            broker->shutdown();
+            brokerThread.join();
+            broker = BrokerPtr();
+        }
     }
 
-    ~BrokerFixture() {
-        if (broker) broker->shutdown();
-        brokerThread.join();
-    }
+    ~BrokerFixture() {  shutdownBroker(); }
 
     /** Open a connection to the broker. */
     void open(qpid::client::Connection& c) {
@@ -97,20 +94,6 @@ struct LocalConnection : public qpid::cl
     ~LocalConnection() { close(); }
 };
 
-/** A local client connection via a socket proxy. */
-struct ProxyConnection : public qpid::client::Connection {
-    SocketProxy proxy;
-    ProxyConnection(int brokerPort) : proxy(brokerPort) {
-        open("localhost", proxy.getPort());
-    }
-    ProxyConnection(const qpid::client::ConnectionSettings& s) : proxy(s.port) {
-        qpid::client::ConnectionSettings proxySettings(s);
-        proxySettings.port = proxy.getPort();
-        open(proxySettings);
-    }
-    ~ProxyConnection() { close(); }
-};
-
 /** Convenience class to create and open a connection and session
  * and some related useful objects.
  */
@@ -147,7 +130,6 @@ struct  SessionFixtureT : BrokerFixture,
 };
 
 typedef SessionFixtureT<LocalConnection> SessionFixture;
-typedef SessionFixtureT<ProxyConnection> ProxySessionFixture;
 
 }} // namespace qpid::tests
 

Modified: qpid/branches/qpid-3346/qpid/cpp/src/tests/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/tests/CMakeLists.txt?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/tests/CMakeLists.txt (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/tests/CMakeLists.txt Mon Sep 19 15:13:18 2011
@@ -264,6 +264,14 @@ add_executable (qpid-send qpid-send.cpp 
 target_link_libraries (qpid-send qpidmessaging)
 remember_location(qpid-send)
 
+add_executable (qpid-ping qpid-ping.cpp ${platform_test_additions})
+target_link_libraries (qpid-ping qpidclient)
+remember_location(qpid-ping)
+
+add_executable (datagen datagen.cpp ${platform_test_additions})
+target_link_libraries (datagen qpidclient)
+remember_location(datagen)
+
 # qpid-perftest and qpid-latency-test are generally useful so install them
 install (TARGETS qpid-perftest qpid-latency-test RUNTIME
          DESTINATION ${QPID_INSTALL_BINDIR})
@@ -278,7 +286,7 @@ set(test_wrap ${shell} ${CMAKE_CURRENT_S
 
 add_test (unit_test ${test_wrap} ${unit_test_LOCATION})
 add_test (start_broker ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/start_broker${test_script_suffix})
-add_test (qpid-client-test ${test_wrap} ${qpid-client_test_LOCATION})
+add_test (qpid-client-test ${test_wrap} ${qpid-client-test_LOCATION})
 add_test (quick_perftest ${test_wrap} ${qpid-perftest_LOCATION} --summary --count 100)
 add_test (quick_topictest ${test_wrap} ${CMAKE_CURRENT_SOURCE_DIR}/quick_topictest${test_script_suffix})
 add_test (quick_txtest ${test_wrap} ${qpid-txtest_LOCATION} --queues 4 --tx-count 10 --quiet)
@@ -288,6 +296,7 @@ if (PYTHON_EXECUTABLE)
 endif (PYTHON_EXECUTABLE)
 add_test (stop_broker ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/stop_broker${test_script_suffix})
 if (PYTHON_EXECUTABLE)
+  add_test (ipv6_test ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/ipv6_test${test_script_suffix})
   add_test (federation_tests ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/run_federation_tests${test_script_suffix})
 if (BUILD_ACL)
   add_test (acl_tests ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/run_acl_tests${test_script_suffix})

Modified: qpid/branches/qpid-3346/qpid/cpp/src/tests/ClientSessionTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/tests/ClientSessionTest.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/tests/ClientSessionTest.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/tests/ClientSessionTest.cpp Mon Sep 19 15:13:18 2011
@@ -102,9 +102,9 @@ struct SimpleListener : public MessageLi
     }
 };
 
-struct ClientSessionFixture : public ProxySessionFixture
+struct ClientSessionFixture : public SessionFixture
 {
-    ClientSessionFixture(Broker::Options opts = Broker::Options()) : ProxySessionFixture(opts) {
+    ClientSessionFixture(Broker::Options opts = Broker::Options()) : SessionFixture(opts) {
         session.queueDeclare(arg::queue="my-queue");
     }
 };
@@ -150,16 +150,6 @@ QPID_AUTO_TEST_CASE(testDispatcherThread
         BOOST_CHECK_EQUAL(boost::lexical_cast<string>(i), listener.messages[i].getData());
 }
 
-// FIXME aconway 2009-06-17: test for unimplemented feature, enable when implemented.
-void testSuspend0Timeout() {
-    ClientSessionFixture fix;
-    fix.session.suspend();  // session has 0 timeout.
-    try {
-        fix.connection.resume(fix.session);
-        BOOST_FAIL("Expected InvalidArgumentException.");
-    } catch(const InternalErrorException&) {}
-}
-
 QPID_AUTO_TEST_CASE(testUseSuspendedError)
 {
     ClientSessionFixture fix;
@@ -171,18 +161,6 @@ QPID_AUTO_TEST_CASE(testUseSuspendedErro
     } catch(const NotAttachedException&) {}
 }
 
-// FIXME aconway 2009-06-17: test for unimplemented feature, enable when implemented.
-void testSuspendResume() {
-    ClientSessionFixture fix;
-    fix.session.timeout(60);
-    fix.session.suspend();
-    // Make sure we are still subscribed after resume.
-    fix.connection.resume(fix.session);
-    fix.session.messageTransfer(arg::content=Message("my-message", "my-queue"));
-    BOOST_CHECK_EQUAL("my-message", fix.subs.get("my-queue", TIME_SEC).getData());
-}
-
-
 QPID_AUTO_TEST_CASE(testSendToSelf) {
     ClientSessionFixture fix;
     SimpleListener mylistener;

Modified: qpid/branches/qpid-3346/qpid/cpp/src/tests/ExchangeTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/tests/ExchangeTest.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/tests/ExchangeTest.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/tests/ExchangeTest.cpp Mon Sep 19 15:13:18 2011
@@ -253,7 +253,7 @@ QPID_AUTO_TEST_CASE(testIVEOption)
     TopicExchange topic ("topic1", false, args);
 
     intrusive_ptr<Message> msg1 = cmessage("direct1", "abc");
-    msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString("a", "abc");
+    msg1->insertCustomProperty("a", "abc");
     DeliverableMessage dmsg1(msg1);
 
     FieldTable args2;

Modified: qpid/branches/qpid-3346/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/tests/Makefile.am?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/tests/Makefile.am (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/tests/Makefile.am Mon Sep 19 15:13:18 2011
@@ -304,9 +304,9 @@ TESTS_ENVIRONMENT = \
 
 system_tests = qpid-client-test quick_perftest quick_topictest run_header_test quick_txtest \
   run_msg_group_tests
-TESTS += start_broker $(system_tests) python_tests stop_broker run_federation_tests \
+TESTS += start_broker $(system_tests) python_tests stop_broker run_federation_tests run_federation_sys_tests \
   run_acl_tests run_cli_tests replication_test dynamic_log_level_test \
-  run_queue_flow_limit_tests
+  run_queue_flow_limit_tests ipv6_test
 
 EXTRA_DIST +=								\
   run_test vg_check							\
@@ -321,6 +321,8 @@ EXTRA_DIST +=								\
   config.null								\
   ais_check								\
   run_federation_tests							\
+  run_federation_sys_tests                  \
+  run_long_federation_sys_tests             \
   run_cli_tests								\
   run_acl_tests								\
   .valgrind.supp							\
@@ -362,6 +364,7 @@ LONG_TESTS+=start_broker \
  fanout_perftest shared_perftest multiq_perftest topic_perftest run_ring_queue_test \
  run_msg_groups_tests_soak \
  stop_broker \
+ run_long_federation_sys_tests \
  run_failover_soak reliable_replication_test \
  federated_cluster_test_with_node_failure
 

Modified: qpid/branches/qpid-3346/qpid/cpp/src/tests/MessageReplayTracker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/tests/MessageReplayTracker.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/tests/MessageReplayTracker.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/tests/MessageReplayTracker.cpp Mon Sep 19 15:13:18 2011
@@ -51,7 +51,7 @@ class ReplayBufferChecker
 
 QPID_AUTO_TEST_CASE(testReplay)
 {
-    ProxySessionFixture fix;
+    SessionFixture fix;
     fix.session.queueDeclare(arg::queue="my-queue", arg::exclusive=true, arg::autoDelete=true);
 
     MessageReplayTracker tracker(10);
@@ -77,7 +77,7 @@ QPID_AUTO_TEST_CASE(testReplay)
 
 QPID_AUTO_TEST_CASE(testCheckCompletion)
 {
-    ProxySessionFixture fix;
+    SessionFixture fix;
     fix.session.queueDeclare(arg::queue="my-queue", arg::exclusive=true, arg::autoDelete=true);
 
     MessageReplayTracker tracker(10);

Modified: qpid/branches/qpid-3346/qpid/cpp/src/tests/MessagingSessionTests.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/tests/MessagingSessionTests.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/tests/MessagingSessionTests.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/tests/MessagingSessionTests.cpp Mon Sep 19 15:13:18 2011
@@ -611,6 +611,28 @@ QPID_AUTO_TEST_CASE(testAssertPolicyQueu
     fix.admin.deleteQueue("q");
 }
 
+QPID_AUTO_TEST_CASE(testAssertExchangeOption)
+{
+    MessagingFixture fix;
+    std::string a1 = "e; {create:always, assert:always, node:{type:topic, x-declare:{type:direct, arguments:{qpid.msg_sequence:True}}}}";
+    Sender s1 = fix.session.createSender(a1);
+    s1.close();
+    Receiver r1 = fix.session.createReceiver(a1);
+    r1.close();
+
+    std::string a2 = "e; {assert:receiver, node:{type:topic, x-declare:{type:fanout, arguments:{qpid.msg_sequence:True}}}}";
+    Sender s2 = fix.session.createSender(a2);
+    s2.close();
+    BOOST_CHECK_THROW(fix.session.createReceiver(a2), qpid::messaging::AssertionFailed);
+
+    std::string a3 = "e; {assert:sender, node:{x-declare:{arguments:{qpid.msg_sequence:False}}}}";
+    BOOST_CHECK_THROW(fix.session.createSender(a3), qpid::messaging::AssertionFailed);
+    Receiver r3 = fix.session.createReceiver(a3);
+    r3.close();
+
+    fix.admin.deleteExchange("e");
+}
+
 QPID_AUTO_TEST_CASE(testGetSender)
 {
     QueueFixture fix;
@@ -1064,6 +1086,20 @@ QPID_AUTO_TEST_CASE(testAcknowledgeUpTo)
     BOOST_CHECK(!fix.session.createReceiver(fix.queue).fetch(m, Duration::IMMEDIATE));
 }
 
+QPID_AUTO_TEST_CASE(testCreateBindingsOnStandardExchange)
+{
+    QueueFixture fix;
+    Sender sender = fix.session.createSender((boost::format("amq.direct; {create:always, node:{type:topic, x-bindings:[{queue:%1%, key:my-subject}]}}") % fix.queue).str());
+    Message out("test-message");
+    out.setSubject("my-subject");
+    sender.send(out);
+    Receiver receiver = fix.session.createReceiver(fix.queue);
+    Message in = receiver.fetch(Duration::SECOND * 5);
+    fix.session.acknowledge();
+    BOOST_CHECK_EQUAL(in.getContent(), out.getContent());
+    BOOST_CHECK_EQUAL(in.getSubject(), out.getSubject());
+}
+
 QPID_AUTO_TEST_SUITE_END()
 
 }} // namespace qpid::tests

Modified: qpid/branches/qpid-3346/qpid/cpp/src/tests/Qmf2.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/tests/Qmf2.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/tests/Qmf2.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/tests/Qmf2.cpp Mon Sep 19 15:13:18 2011
@@ -23,12 +23,36 @@
 #include "qmf/QueryImpl.h"
 #include "qmf/SchemaImpl.h"
 #include "qmf/exceptions.h"
-
+#include "qpid/messaging/Connection.h"
+#include "qmf/PosixEventNotifierImpl.h"
+#include "qmf/AgentSession.h"
+#include "qmf/AgentSessionImpl.h"
+#include "qmf/ConsoleSession.h"
+#include "qmf/ConsoleSessionImpl.h"
 #include "unit_test.h"
 
+using namespace std;
 using namespace qpid::types;
+using namespace qpid::messaging;
 using namespace qmf;
 
+bool isReadable(int fd)
+{
+    fd_set rfds;
+    struct timeval tv;
+    int nfds, result;
+
+    FD_ZERO(&rfds);
+    FD_SET(fd, &rfds);
+    nfds = fd + 1;
+    tv.tv_sec = 0;
+    tv.tv_usec = 0;
+
+    result = select(nfds, &rfds, NULL, NULL, &tv);
+
+    return result > 0;
+}
+
 namespace qpid {
 namespace tests {
 
@@ -315,6 +339,84 @@ QPID_AUTO_TEST_CASE(testSchema)
     BOOST_CHECK_THROW(method.getArgument(3), QmfException);
 }
 
+QPID_AUTO_TEST_CASE(testAgentSessionEventListener)
+{
+    Connection connection("localhost");
+    AgentSession session(connection, "");
+    posix::EventNotifier notifier(session);
+
+    AgentSessionImpl& sessionImpl = AgentSessionImplAccess::get(session);
+            
+    BOOST_CHECK(sessionImpl.getEventNotifier() != 0);
+}
+
+QPID_AUTO_TEST_CASE(testConsoleSessionEventListener)
+{
+    Connection connection("localhost");
+    ConsoleSession session(connection, "");
+    posix::EventNotifier notifier(session);
+
+    ConsoleSessionImpl& sessionImpl = ConsoleSessionImplAccess::get(session);
+
+    BOOST_CHECK(sessionImpl.getEventNotifier() != 0);
+}
+
+QPID_AUTO_TEST_CASE(testGetHandle)
+{
+    Connection connection("localhost");
+    ConsoleSession session(connection, "");
+    posix::EventNotifier notifier(session);
+
+    BOOST_CHECK(notifier.getHandle() > 0);
+}
+
+QPID_AUTO_TEST_CASE(testSetReadableToFalse)
+{
+    Connection connection("localhost");
+    ConsoleSession session(connection, "");
+    posix::EventNotifier notifier(session);
+    PosixEventNotifierImplAccess::get(notifier).setReadable(false);
+
+    bool readable(isReadable(notifier.getHandle()));
+    BOOST_CHECK(!readable);
+}
+
+QPID_AUTO_TEST_CASE(testSetReadable)
+{
+    Connection connection("localhost");
+    ConsoleSession session(connection, "");
+    posix::EventNotifier notifier(session);
+    PosixEventNotifierImplAccess::get(notifier).setReadable(true);
+
+    bool readable(isReadable(notifier.getHandle()));
+    BOOST_CHECK(readable);
+}
+
+QPID_AUTO_TEST_CASE(testSetReadableMultiple)
+{
+    Connection connection("localhost");
+    ConsoleSession session(connection, "");
+    posix::EventNotifier notifier(session);
+    for (int i = 0; i < 15; i++)
+        PosixEventNotifierImplAccess::get(notifier).setReadable(true);
+    PosixEventNotifierImplAccess::get(notifier).setReadable(false);
+
+    bool readable(isReadable(notifier.getHandle()));
+    BOOST_CHECK(!readable);
+}
+
+QPID_AUTO_TEST_CASE(testDeleteNotifier)
+{
+    Connection connection("localhost");
+    ConsoleSession session(connection, "");
+    ConsoleSessionImpl& sessionImpl = ConsoleSessionImplAccess::get(session);
+    {
+        posix::EventNotifier notifier(session);
+        BOOST_CHECK(sessionImpl.getEventNotifier() != 0);
+    }
+    BOOST_CHECK(sessionImpl.getEventNotifier() == 0);
+}
+
 QPID_AUTO_TEST_SUITE_END()
 
 }} // namespace qpid::tests

Modified: qpid/branches/qpid-3346/qpid/cpp/src/tests/QueueEvents.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/tests/QueueEvents.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/tests/QueueEvents.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/tests/QueueEvents.cpp Mon Sep 19 15:13:18 2011
@@ -147,7 +147,7 @@ struct EventRecorder
 
 QPID_AUTO_TEST_CASE(testSystemLevelEventProcessing)
 {
-    ProxySessionFixture fixture;
+    SessionFixture fixture;
     //register dummy event listener to broker
     EventRecorder listener;
     fixture.broker->getQueueEvents().registerListener("recorder", boost::bind(&EventRecorder::handle, &listener, _1));
@@ -194,7 +194,7 @@ QPID_AUTO_TEST_CASE(testSystemLevelEvent
 
 QPID_AUTO_TEST_CASE(testSystemLevelEventProcessing_enqueuesOnly)
 {
-    ProxySessionFixture fixture;
+    SessionFixture fixture;
     //register dummy event listener to broker
     EventRecorder listener;
     fixture.broker->getQueueEvents().registerListener("recorder", boost::bind(&EventRecorder::handle, &listener, _1));

Modified: qpid/branches/qpid-3346/qpid/cpp/src/tests/QueuePolicyTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/tests/QueuePolicyTest.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/tests/QueuePolicyTest.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/tests/QueuePolicyTest.cpp Mon Sep 19 15:13:18 2011
@@ -152,7 +152,7 @@ QPID_AUTO_TEST_CASE(testRingPolicyCount)
     std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 5, 0, QueuePolicy::RING);
     policy->update(args);
 
-    ProxySessionFixture f;
+    SessionFixture f;
     std::string q("my-ring-queue");
     f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args);
     for (int i = 0; i < 10; i++) {
@@ -187,7 +187,7 @@ QPID_AUTO_TEST_CASE(testRingPolicySize)
     std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 0, 500, QueuePolicy::RING);
     policy->update(args);
 
-    ProxySessionFixture f;
+    SessionFixture f;
     std::string q("my-ring-queue");
     f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args);
 
@@ -259,7 +259,7 @@ QPID_AUTO_TEST_CASE(testStrictRingPolicy
     std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 5, 0, QueuePolicy::RING_STRICT);
     policy->update(args);
 
-    ProxySessionFixture f;
+    SessionFixture f;
     std::string q("my-ring-queue");
     f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args);
     LocalQueue incoming;
@@ -285,7 +285,7 @@ QPID_AUTO_TEST_CASE(testPolicyWithDtx)
     std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 5, 0, QueuePolicy::REJECT);
     policy->update(args);
 
-    ProxySessionFixture f;
+    SessionFixture f;
     std::string q("my-policy-queue");
     f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args);
     LocalQueue incoming;
@@ -345,7 +345,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskWithNo
     // Disable flow control, or else we'll never hit the max limit
     args.setInt(QueueFlowLimit::flowStopCountKey, 0);
 
-    ProxySessionFixture f;
+    SessionFixture f;
     std::string q("my-queue");
     f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args);
     LocalQueue incoming;
@@ -371,7 +371,7 @@ QPID_AUTO_TEST_CASE(testPolicyFailureOnC
     std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 5, 0, QueuePolicy::REJECT);
     policy->update(args);
 
-    ProxySessionFixture f;
+    SessionFixture f;
     std::string q("q");
     f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args);
     f.session.txSelect();
@@ -388,7 +388,7 @@ QPID_AUTO_TEST_CASE(testCapacityConversi
     args.setString("qpid.max_count", "5");
     args.setString("qpid.flow_stop_count", "0");
 
-    ProxySessionFixture f;
+    SessionFixture f;
     std::string q("q");
     f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args);
     for (int i = 0; i < 5; i++) {

Modified: qpid/branches/qpid-3346/qpid/cpp/src/tests/QueueTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/tests/QueueTest.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/tests/QueueTest.cpp Mon Sep 19 15:13:18 2011
@@ -81,13 +81,14 @@ public:
     Message& getMessage() { return *(msg.get()); }
 };
 
-intrusive_ptr<Message> create_message(std::string exchange, std::string routingKey) {
+intrusive_ptr<Message> create_message(std::string exchange, std::string routingKey, uint64_t ttl = 0) {
     intrusive_ptr<Message> msg(new Message());
     AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange, 0, 0)));
     AMQFrame header((AMQHeaderBody()));
     msg->getFrames().append(method);
     msg->getFrames().append(header);
     msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey);
+    if (ttl) msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setTtl(ttl);
     return msg;
 }
 
@@ -441,10 +442,10 @@ QPID_AUTO_TEST_CASE(testLVQOrdering){
     BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
 
 
-	msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
-	msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b");
-	msg3->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c");
-	msg4->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
+	msg1->insertCustomProperty(key,"a");
+	msg2->insertCustomProperty(key,"b");
+	msg3->insertCustomProperty(key,"c");
+	msg4->insertCustomProperty(key,"a");
 
 	//enqueue 4 message
     queue->deliver(msg1);
@@ -466,9 +467,9 @@ QPID_AUTO_TEST_CASE(testLVQOrdering){
     intrusive_ptr<Message> msg5 = create_message("e", "A");
     intrusive_ptr<Message> msg6 = create_message("e", "B");
     intrusive_ptr<Message> msg7 = create_message("e", "C");
-	msg5->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
-	msg6->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b");
-	msg7->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c");
+	msg5->insertCustomProperty(key,"a");
+	msg6->insertCustomProperty(key,"b");
+	msg7->insertCustomProperty(key,"c");
     queue->deliver(msg5);
     queue->deliver(msg6);
     queue->deliver(msg7);
@@ -503,7 +504,7 @@ QPID_AUTO_TEST_CASE(testLVQEmptyKey){
     BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
 
 
-    msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
+    msg1->insertCustomProperty(key,"a");
     queue->deliver(msg1);
     queue->deliver(msg2);
     BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u);
@@ -535,12 +536,12 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){
     BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
 
 
-    msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
-    msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b");
-    msg3->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c");
-    msg4->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
-    msg5->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b");
-    msg6->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c");
+    msg1->insertCustomProperty(key,"a");
+    msg2->insertCustomProperty(key,"b");
+    msg3->insertCustomProperty(key,"c");
+    msg4->insertCustomProperty(key,"a");
+    msg5->insertCustomProperty(key,"b");
+    msg6->insertCustomProperty(key,"c");
 
     //enqueue 4 message
     queue->deliver(msg1);
@@ -605,8 +606,8 @@ QPID_AUTO_TEST_CASE(testLVQMultiQueue){
     args.getLVQKey(key);
     BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
 
-    msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
-    msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
+    msg1->insertCustomProperty(key,"a");
+    msg2->insertCustomProperty(key,"a");
 
     queue1->deliver(msg1);
     queue2->deliver(msg1);
@@ -649,8 +650,8 @@ QPID_AUTO_TEST_CASE(testLVQRecover){
     args.getLVQKey(key);
     BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
 
-    msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
-    msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
+    msg1->insertCustomProperty(key,"a");
+    msg2->insertCustomProperty(key,"a");
 	// 3
     queue1->deliver(msg1);
     // 4
@@ -670,12 +671,7 @@ QPID_AUTO_TEST_CASE(testLVQRecover){
 void addMessagesToQueue(uint count, Queue& queue, uint oddTtl = 200, uint evenTtl = 0)
 {
     for (uint i = 0; i < count; i++) {
-        intrusive_ptr<Message> m = create_message("exchange", "key");
-        if (i % 2) {
-            if (oddTtl) m->getProperties<DeliveryProperties>()->setTtl(oddTtl);
-        } else {
-            if (evenTtl) m->getProperties<DeliveryProperties>()->setTtl(evenTtl);
-        }
+        intrusive_ptr<Message> m = create_message("exchange", "key", i % 2 ? oddTtl : evenTtl);
         m->setTimestamp(new broker::ExpiryPolicy);
         queue.deliver(m);
     }
@@ -738,8 +734,8 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsu
                              std::string("c"), std::string("c"), std::string("c") };
     for (int i = 0; i < 9; ++i) {
         intrusive_ptr<Message> msg = create_message("e", "A");
-        msg->getProperties<MessageProperties>()->getApplicationHeaders().setString("GROUP-ID", groups[i]);
-        msg->getProperties<MessageProperties>()->getApplicationHeaders().setInt("MY-ID", i);
+        msg->insertCustomProperty("GROUP-ID", groups[i]);
+        msg->insertCustomProperty("MY-ID", i);
         queue->deliver(msg);
     }
 
@@ -885,8 +881,8 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsu
     // Owners= ^C3,
 
     intrusive_ptr<Message> msg = create_message("e", "A");
-    msg->getProperties<MessageProperties>()->getApplicationHeaders().setString("GROUP-ID", "a");
-    msg->getProperties<MessageProperties>()->getApplicationHeaders().setInt("MY-ID", 9);
+    msg->insertCustomProperty("GROUP-ID", "a");
+    msg->insertCustomProperty("MY-ID", 9);
     queue->deliver(msg);
 
     // Queue = a-2, a-9
@@ -896,8 +892,8 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsu
     BOOST_CHECK( !gotOne );
 
     msg = create_message("e", "A");
-    msg->getProperties<MessageProperties>()->getApplicationHeaders().setString("GROUP-ID", "b");
-    msg->getProperties<MessageProperties>()->getApplicationHeaders().setInt("MY-ID", 10);
+    msg->insertCustomProperty("GROUP-ID", "b");
+    msg->insertCustomProperty("MY-ID", 10);
     queue->deliver(msg);
 
     // Queue = a-2, a-9, b-10
@@ -927,7 +923,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsu
     for (int i = 0; i < 3; ++i) {
         intrusive_ptr<Message> msg = create_message("e", "A");
         // no "GROUP-ID" header
-        msg->getProperties<MessageProperties>()->getApplicationHeaders().setInt("MY-ID", i);
+        msg->insertCustomProperty("MY-ID", i);
         queue->deliver(msg);
     }
 

Modified: qpid/branches/qpid-3346/qpid/cpp/src/tests/ReplicationTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/tests/ReplicationTest.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/tests/ReplicationTest.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/tests/ReplicationTest.cpp Mon Sep 19 15:13:18 2011
@@ -74,7 +74,7 @@ QPID_AUTO_TEST_CASE(testReplicationExcha
 {
     qpid::broker::Broker::Options brokerOpts(getBrokerOpts(list_of<string>("qpidd")
                                                            ("--replication-exchange-name=qpid.replication")));
-    ProxySessionFixture f(brokerOpts);
+    SessionFixture f(brokerOpts);
 
 
     std::string dataQ("queue-1");

Modified: qpid/branches/qpid-3346/qpid/cpp/src/tests/SessionState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/tests/SessionState.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/tests/SessionState.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/tests/SessionState.cpp Mon Sep 19 15:13:18 2011
@@ -43,7 +43,7 @@ using namespace qpid::framing;
 // Apply f to [begin, end) and accumulate the result
 template <class Iter, class T, class F>
 T applyAccumulate(Iter begin, Iter end, T seed, const F& f) {
-    return std::accumulate(begin, end, seed, bind(std::plus<T>(), _1, bind(f, _2)));
+    return std::accumulate(begin, end, seed, boost::bind(std::plus<T>(), _1, boost::bind(f, _2)));
 }
 
 // Create a frame with a one-char string.
@@ -105,8 +105,8 @@ size_t transferN(qpid::SessionState& s, 
         char last = content[content.size()-1];
         content.resize(content.size()-1);
         size += applyAccumulate(content.begin(), content.end(), 0,
-                                bind(&send, ref(s),
-                                     bind(contentFrameChar, _1, false)));
+                                boost::bind(&send, boost::ref(s),
+                                     boost::bind(contentFrameChar, _1, false)));
         size += send(s, contentFrameChar(last, true));
     }
     return size;
@@ -115,7 +115,7 @@ size_t transferN(qpid::SessionState& s, 
 // Send multiple transfers with single-byte content.
 size_t transfers(qpid::SessionState& s, string content) {
     return applyAccumulate(content.begin(), content.end(), 0,
-                           bind(transfer1Char, ref(s), _1));
+                           boost::bind(transfer1Char, boost::ref(s), _1));
 }
 
 size_t contentFrameSize(size_t n=1) { return AMQFrame(( AMQContentBody())).encodedSize() + n; }

Modified: qpid/branches/qpid-3346/qpid/cpp/src/tests/TxPublishTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/tests/TxPublishTest.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/tests/TxPublishTest.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/tests/TxPublishTest.cpp Mon Sep 19 15:13:18 2011
@@ -50,10 +50,9 @@ struct TxPublishTest
     TxPublishTest() :
         queue1(new Queue("queue1", false, &store, 0)),
         queue2(new Queue("queue2", false, &store, 0)),
-        msg(MessageUtils::createMessage("exchange", "routing_key", false, "id")),
+        msg(MessageUtils::createMessage("exchange", "routing_key", true)),
         op(msg)
     {
-        msg->getProperties<DeliveryProperties>()->setDeliveryMode(PERSISTENT);
         op.deliverTo(queue1);
         op.deliverTo(queue2);
     }

Modified: qpid/branches/qpid-3346/qpid/cpp/src/tests/Url.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/tests/Url.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/tests/Url.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/tests/Url.cpp Mon Sep 19 15:13:18 2011
@@ -60,6 +60,32 @@ QPID_AUTO_TEST_CASE(TestParseXyz) {
     BOOST_CHECK_EQUAL(Url("xyz:host").str(), "amqp:xyz:host:5672");
 }
 
+QPID_AUTO_TEST_CASE(TestParseTricky) {
+    BOOST_CHECK_EQUAL(Url("amqp").str(), "amqp:tcp:amqp:5672");
+    BOOST_CHECK_EQUAL(Url("amqp:tcp").str(), "amqp:tcp:tcp:5672");
+    // These are ambiguous parses and arguably not the best result
+    BOOST_CHECK_EQUAL(Url("amqp:876").str(), "amqp:tcp:876:5672");
+    BOOST_CHECK_EQUAL(Url("tcp:567").str(), "amqp:tcp:567:5672");
+}
+
+QPID_AUTO_TEST_CASE(TestParseIPv6) {
+    Url u1("[::]");
+    BOOST_CHECK_EQUAL(u1[0].host, "::");
+    BOOST_CHECK_EQUAL(u1[0].port, 5672);
+    Url u2("[::1]");
+    BOOST_CHECK_EQUAL(u2[0].host, "::1");
+    BOOST_CHECK_EQUAL(u2[0].port, 5672);
+    Url u3("[::127.0.0.1]");
+    BOOST_CHECK_EQUAL(u3[0].host, "::127.0.0.1");
+    BOOST_CHECK_EQUAL(u3[0].port, 5672);
+    Url u4("[2002::222:68ff:fe0b:e61a]");
+    BOOST_CHECK_EQUAL(u4[0].host, "2002::222:68ff:fe0b:e61a");
+    BOOST_CHECK_EQUAL(u4[0].port, 5672);
+    Url u5("[2002::222:68ff:fe0b:e61a]:123");
+    BOOST_CHECK_EQUAL(u5[0].host, "2002::222:68ff:fe0b:e61a");
+    BOOST_CHECK_EQUAL(u5[0].port, 123);
+}
+
 QPID_AUTO_TEST_CASE(TestParseMultiAddress) {
     Url::addProtocol("xyz");
     URL_CHECK_STR("amqp:tcp:host:0,xyz:foo:123,tcp:foo:0,xyz:bar:1");

Modified: qpid/branches/qpid-3346/qpid/cpp/src/tests/XmlClientSessionTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/tests/XmlClientSessionTest.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/tests/XmlClientSessionTest.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/tests/XmlClientSessionTest.cpp Mon Sep 19 15:13:18 2011
@@ -90,7 +90,7 @@ struct SimpleListener : public MessageLi
     }
 };
 
-struct ClientSessionFixture : public ProxySessionFixture
+struct ClientSessionFixture : public SessionFixture
 {
     void declareSubscribe(const string& q="odd_blue",
                           const string& dest="xml")

Modified: qpid/branches/qpid-3346/qpid/cpp/src/tests/allhosts
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/tests/allhosts?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/tests/allhosts (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/tests/allhosts Mon Sep 19 15:13:18 2011
@@ -29,11 +29,12 @@ Options:
   -s SECONDS  sleep between starting commands.
   -q          don't print banner lines for each host.
   -o SUFFIX   log output of each command to <host>.SUFFIX
+  -X          passed to ssh - forward X connection.
 "
     exit 1
 }
 
-while getopts "tl:bs:dqo:" opt; do
+while getopts "tl:bs:dqo:X" opt; do
     case $opt in
 	l) SSHOPTS="-l$OPTARG $SSHOPTS" ;;
 	t) SSHOPTS="-t $SSHOPTS" ;;
@@ -42,6 +43,7 @@ while getopts "tl:bs:dqo:" opt; do
 	s) SLEEP="sleep $OPTARG" ;;
 	q) NOBANNER=1 ;;
 	o) SUFFIX=$OPTARG ;;
+	X) SSHOPTS="-X $SSHOPTS" ;;
 	*) usage;;
     esac
 done

Modified: qpid/branches/qpid-3346/qpid/cpp/src/tests/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/tests/brokertest.py?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/tests/brokertest.py (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/tests/brokertest.py Mon Sep 19 15:13:18 2011
@@ -251,7 +251,7 @@ class Broker(Popen):
     def get_log(self):
         return os.path.abspath(self.log)
 
-    def __init__(self, test, args=[], name=None, expect=EXPECT_RUNNING, port=0, log_level=None, wait=None):
+    def __init__(self, test, args=[], name=None, expect=EXPECT_RUNNING, port=0, log_level=None, wait=None, show_cmd=False):
         """Start a broker daemon. name determines the data-dir and log
         file names."""
 
@@ -280,6 +280,7 @@ class Broker(Popen):
             cmd += ["--log-enable=%s" % log_level]
         self.datadir = self.name
         cmd += ["--data-dir", self.datadir]
+        if show_cmd: print cmd
         Popen.__init__(self, cmd, expect, stdout=PIPE)
         test.cleanup_stop(self)
         self._host = "127.0.0.1"
@@ -400,7 +401,7 @@ class Cluster:
 
     _cluster_count = 0
 
-    def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING, wait=True):
+    def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING, wait=True, show_cmd=False):
         self.test = test
         self._brokers=[]
         self.name = "cluster%d" % Cluster._cluster_count
@@ -411,16 +412,19 @@ class Cluster:
         self.args += [ "--log-enable=info+", "--log-enable=debug+:cluster"]
         assert BrokerTest.cluster_lib, "Cannot locate cluster plug-in"
         self.args += [ "--load-module", BrokerTest.cluster_lib ]
-        self.start_n(count, expect=expect, wait=wait)
+        self.start_n(count, expect=expect, wait=wait, show_cmd=show_cmd)
 
-    def start(self, name=None, expect=EXPECT_RUNNING, wait=True, args=[], port=0):
+    def start(self, name=None, expect=EXPECT_RUNNING, wait=True, args=[], port=0, show_cmd=False):
         """Add a broker to the cluster. Returns the index of the new broker."""
         if not name: name="%s-%d" % (self.name, len(self._brokers))
-        self._brokers.append(self.test.broker(self.args+args, name, expect, wait, port=port))
+        self._brokers.append(self.test.broker(self.args+args, name, expect, wait, port=port, show_cmd=show_cmd))
         return self._brokers[-1]
 
-    def start_n(self, count, expect=EXPECT_RUNNING, wait=True, args=[]):
-        for i in range(count): self.start(expect=expect, wait=wait, args=args)
+    def ready(self):
+        for b in self: b.ready()
+
+    def start_n(self, count, expect=EXPECT_RUNNING, wait=True, args=[], show_cmd=False):
+        for i in range(count): self.start(expect=expect, wait=wait, args=args, show_cmd=show_cmd)
 
     # Behave like a list of brokers.
     def __len__(self): return len(self._brokers)
@@ -477,31 +481,30 @@ class BrokerTest(TestCase):
         self.cleanup_stop(p)
         return p
 
-    def broker(self, args=[], name=None, expect=EXPECT_RUNNING, wait=True, port=0, log_level=None):
+    def broker(self, args=[], name=None, expect=EXPECT_RUNNING, wait=True, port=0, log_level=None, show_cmd=False):
         """Create and return a broker ready for use"""
-        b = Broker(self, args=args, name=name, expect=expect, port=port, log_level=log_level)
+        b = Broker(self, args=args, name=name, expect=expect, port=port, log_level=log_level, show_cmd=show_cmd)
         if (wait):
             try: b.ready()
             except Exception, e:
                 raise RethrownException("Failed to start broker %s(%s): %s" % (b.name, b.log, e))
         return b
 
-    def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait=True):
+    def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait=True, show_cmd=False):
         """Create and return a cluster ready for use"""
-        cluster = Cluster(self, count, args, expect=expect, wait=wait)
+        cluster = Cluster(self, count, args, expect=expect, wait=wait, show_cmd=show_cmd)
         return cluster
 
     def browse(self, session, queue, timeout=0):
-        """Assert that the contents of messages on queue (as retrieved
-        using session and timeout) exactly match the strings in
-        expect_contents"""
+        """Return a list with the contents of each message on queue."""
         r = session.receiver("%s;{mode:browse}"%(queue))
+        r.capacity = 100
         try:
             contents = []
             try:
                 while True: contents.append(r.fetch(timeout=timeout).content)
             except messaging.Empty: pass
-        finally: pass                   #FIXME aconway 2011-04-14: r.close()
+        finally: r.close()
         return contents
 
     def assert_browse(self, session, queue, expect_contents, timeout=0):



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message