qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r745226 - in /qpid/trunk/qpid/cpp/src/qpid: client/SubscriptionManager.cpp client/SubscriptionManager.h cluster/Cluster.cpp
Date Tue, 17 Feb 2009 20:18:38 GMT
Author: aconway
Date: Tue Feb 17 20:18:38 2009
New Revision: 745226

URL: http://svn.apache.org/viewvc?rev=745226&view=rev
Log:
Minor fixes. 

client/SubscriptionManager: made it thread safe, was causing latencytest to crash with --rate
and --time-limit.

cluster/Cluster.cpp: don't call cpg_leave during shutdown. Not required and a problem if shutdown
was caused by a cpg error.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp?rev=745226&r1=745225&r2=745226&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp Tue Feb 17 20:18:38 2009
@@ -41,6 +41,7 @@
 Subscription SubscriptionManager::subscribe(
     MessageListener& listener, const std::string& q, const SubscriptionSettings&
ss, const std::string& n)
 {
+    sys::Mutex::ScopedLock l(lock);
     std::string name=n.empty() ? q:n;
     boost::intrusive_ptr<SubscriptionImpl> si = new SubscriptionImpl(*this, q, ss,
name, &listener);
     dispatcher.listen(si);
@@ -52,6 +53,7 @@
 Subscription SubscriptionManager::subscribe(
     LocalQueue& lq, const std::string& q, const SubscriptionSettings& ss, const
std::string& n)
 {
+    sys::Mutex::ScopedLock l(lock);
     std::string name=n.empty() ? q:n;
     boost::intrusive_ptr<SubscriptionImpl> si = new SubscriptionImpl(*this, q, ss,
name, 0);
     lq.queue=si->divert();
@@ -74,13 +76,14 @@
 
 void SubscriptionManager::cancel(const std::string& dest)
 {
+    sys::Mutex::ScopedLock l(lock);
     std::map<std::string, Subscription>::iterator i = subscriptions.find(dest);
     if (i != subscriptions.end()) {
         sync(session).messageCancel(dest);
         dispatcher.cancel(dest);
         Subscription s = i->second;
-        if (s.isValid()) subscriptions[dest].impl->cancelDiversion();
-        subscriptions.erase(dest);
+        if (s.isValid()) s.impl->cancelDiversion();
+        subscriptions.erase(i);
     }
 }
 
@@ -131,6 +134,7 @@
 Session SubscriptionManager::getSession() const { return session; }
 
 Subscription SubscriptionManager::getSubscription(const std::string& name) const {
+    sys::Mutex::ScopedLock l(lock);
     std::map<std::string, Subscription>::const_iterator i = subscriptions.find(name);
     if (i == subscriptions.end())
         throw Exception(QPID_MSG("Subscription not found: " << name));

Modified: qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h?rev=745226&r1=745225&r2=745226&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h Tue Feb 17 20:18:38 2009
@@ -95,14 +95,6 @@
  */
 class SubscriptionManager : public sys::Runnable
 {
-    typedef sys::Mutex::ScopedLock Lock;
-    typedef sys::Mutex::ScopedUnlock Unlock;
-
-    qpid::client::Dispatcher dispatcher;
-    qpid::client::AsyncSession session;
-    bool autoStop;
-    SubscriptionSettings defaultSettings;
-    
   public:
     /** Create a new SubscriptionManager associated with a session */
     SubscriptionManager(const Session& session);
@@ -271,6 +263,11 @@
     Session getSession() const;
 
   private:
+    mutable sys::Mutex lock;
+    qpid::client::Dispatcher dispatcher;
+    qpid::client::AsyncSession session;
+    bool autoStop;
+    SubscriptionSettings defaultSettings;
     std::map<std::string, Subscription> subscriptions;
 };
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=745226&r1=745225&r2=745226&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Tue Feb 17 20:18:38 2009
@@ -193,10 +193,6 @@
     if (state != LEFT) {
         state = LEFT;
         QPID_LOG(notice, *this << " leaving cluster " << name);
-        try { cpg.leave(); }
-        catch (const std::exception& e) {
-            QPID_LOG(critical, *this << " error leaving process group: " << e.what());
-        }
         connections.clear();
         try { broker.shutdown(); }
         catch (const std::exception& e) {
@@ -371,7 +367,6 @@
 // callbacks will be invoked.
 // 
 void Cluster::brokerShutdown()  {
-    QPID_LOG(notice, *this << " shutting down ");
     if (state != LEFT) {
         try { cpg.shutdown(); }
         catch (const std::exception& e) {



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message