qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From astitc...@apache.org
Subject svn commit: r887956 - in /qpid/trunk/qpid/cpp/src/qpid/sys: DeletionManager.h epoll/EpollPoller.cpp
Date Mon, 07 Dec 2009 15:42:15 GMT
Author: astitcher
Date: Mon Dec  7 15:42:14 2009
New Revision: 887956

URL: http://svn.apache.org/viewvc?rev=887956&view=rev
Log:
QPID-2214: Opening and closing client connections causes memory use to grow unboundedly
- Clean up the DeletionManager state for each thread when the thread exits

Modified:
    qpid/trunk/qpid/cpp/src/qpid/sys/DeletionManager.h
    qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/DeletionManager.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/DeletionManager.h?rev=887956&r1=887955&r2=887956&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/DeletionManager.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/DeletionManager.h Mon Dec  7 15:42:14 2009
@@ -54,6 +54,8 @@
 template <typename H>
 class DeletionManager 
 {
+    struct ThreadStatus;
+
 public:
     // Mark every thread as using the handle - it will be deleted
     // below after every thread marks the handle as unused
@@ -65,6 +67,28 @@
     // handles get deleted here when no one else
     // is using them either 
     static void markAllUnusedInThisThread() {
+        ThreadStatus* threadStatus = getThreadStatus();
+        ScopedLock<Mutex> l(threadStatus->lock);
+
+        // The actual deletions will happen here when all the shared_ptr
+        // ref counts hit 0 (that is when every thread marks the handle unused)
+        threadStatus->handles.clear();
+    }
+
+    static void destroyThreadState() {
+        ThreadStatus* threadStatus = getThreadStatus();
+        {
+        ScopedLock<Mutex> l(threadStatus->lock);
+
+        allThreadsStatuses.delThreadStatus(threadStatus);
+        }
+        delete threadStatus;
+        threadStatus = 0;
+    }
+
+private:
+
+    static ThreadStatus*& getThreadStatus() {
         static __thread ThreadStatus* threadStatus = 0;
 
         // Thread local vars can't be dynamically constructed so we need
@@ -75,14 +99,9 @@
             allThreadsStatuses.addThreadStatus(threadStatus);
         }
 
-        ScopedLock<Mutex> l(threadStatus->lock);
-        
-        // The actual deletions will happen here when all the shared_ptr
-        // ref counts hit 0 (that is when every thread marks the handle unused)
-        threadStatus->handles.clear();
+        return threadStatus;
     }
 
-private:
     typedef boost::shared_ptr<H> shared_ptr;
 
     // In theory we know that we never need more handles than the number of
@@ -125,6 +144,15 @@
             statuses.push_back(t);
         }
 
+        void delThreadStatus(ThreadStatus* t) {
+            ScopedLock<Mutex> l(lock);
+            typename std::vector<ThreadStatus*>::iterator it =
+                std::find(statuses.begin(),statuses.end(), t);
+            if (it != statuses.end()) {
+                statuses.erase(it);
+            }
+        }
+
         void addHandle(shared_ptr h) {
             ScopedLock<Mutex> l(lock);
             std::for_each(statuses.begin(), statuses.end(), handleAdder(h));

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp?rev=887956&r1=887955&r2=887956&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp Mon Dec  7 15:42:14 2009
@@ -25,6 +25,7 @@
 #include "qpid/sys/DeletionManager.h"
 #include "qpid/sys/posix/check.h"
 #include "qpid/sys/posix/PrivatePosix.h"
+#include "qpid/log/Statement.h"
 
 #include <sys/epoll.h>
 #include <errno.h>
@@ -467,28 +468,35 @@
 }
 
 void Poller::run() {
-    // Make sure we can't be interrupted by signals at a bad time
-    ::sigset_t ss;
-    ::sigfillset(&ss);
-    ::pthread_sigmask(SIG_SETMASK, &ss, 0);
-
-    do {
-        Event event = wait();
-
-        // If can read/write then dispatch appropriate callbacks
-        if (event.handle) {
-            event.process();
-        } else {
-            // Handle shutdown
-            switch (event.type) {
-            case SHUTDOWN:
-                return;
-            default:
-                // This should be impossible
-                assert(false);
+    // Ensure that we exit thread responsibly under all circumstances
+    try {
+        // Make sure we can't be interrupted by signals at a bad time
+        ::sigset_t ss;
+        ::sigfillset(&ss);
+        ::pthread_sigmask(SIG_SETMASK, &ss, 0);
+
+        do {
+            Event event = wait();
+
+            // If can read/write then dispatch appropriate callbacks
+            if (event.handle) {
+                event.process();
+            } else {
+                // Handle shutdown
+                switch (event.type) {
+                case SHUTDOWN:
+                    PollerHandleDeletionManager.destroyThreadState();
+                    return;
+                default:
+                    // This should be impossible
+                    assert(false);
+                }
             }
-        }
-    } while (true);
+        } while (true);
+    } catch (const std::exception& e) {
+        QPID_LOG(error, "IO worker thread exiting with unhandled exception: " << e.what());
+    }
+    PollerHandleDeletionManager.destroyThreadState();
 }
 
 Poller::Event Poller::wait(Duration timeout) {



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message