qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r724371 - in /incubator/qpid/trunk/qpid/cpp/src: cluster.mk qpid/cluster/Cluster.cpp qpid/cluster/Cluster.h qpid/cluster/ConnectionMap.h
Date Mon, 08 Dec 2008 14:57:06 GMT
Author: aconway
Date: Mon Dec  8 06:57:05 2008
New Revision: 724371

URL: http://svn.apache.org/viewvc?rev=724371&view=rev
Log:
Cluster: separated connection map lock to reduce contention.

Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.h   (with props)
Modified:
    incubator/qpid/trunk/qpid/cpp/src/cluster.mk
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h

Modified: incubator/qpid/trunk/qpid/cpp/src/cluster.mk
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/cluster.mk?rev=724371&r1=724370&r2=724371&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ incubator/qpid/trunk/qpid/cpp/src/cluster.mk Mon Dec  8 06:57:05 2008
@@ -31,6 +31,7 @@
   qpid/cluster/ConnectionCodec.cpp \
   qpid/cluster/Connection.h \
   qpid/cluster/Connection.cpp \
+  qpid/cluster/ConnectionMap.h \
   qpid/cluster/NoOpConnectionOutputHandler.h \
   qpid/cluster/WriteEstimate.h \
   qpid/cluster/WriteEstimate.cpp \

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=724371&r1=724370&r2=724371&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Mon Dec  8 06:57:05 2008
@@ -128,15 +128,11 @@
     if (dumpThread.id()) dumpThread.join(); // Join the previous dumpthread.
 }
 
-bool Cluster::insert(const boost::intrusive_ptr<Connection>& c) {
-    Lock l(lock);
-    bool result = connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c)).second;
-    assert(result);
-    return result;
+void Cluster::insert(const boost::intrusive_ptr<Connection>& c) {
+    connections.insert(c->getId(), c);
 }
 
 void Cluster::erase(ConnectionId id) {
-    Lock l(lock);
     connections.erase(id);
 }
 
@@ -226,29 +222,15 @@
     }
 }
 
-boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& connectionId,
Lock&)  {
-    ConnectionMap::iterator i = connections.find(connectionId);
-    if (i == connections.end()) { 
-        if (connectionId.getMember() == myId) { // Closed local connection
-            QPID_LOG(debug, *this << " activity on closed connection: " << connectionId);
-            return boost::intrusive_ptr<Connection>();
-        }
-        else {                  // New shadow connection
-            std::ostringstream mgmtId;
-            mgmtId << name << ":"  << connectionId;
-            ConnectionMap::value_type value(connectionId,
-                                            new Connection(*this, shadowOut, mgmtId.str(),
connectionId));
-            i = connections.insert(value).first;
-        }
-    }
-    return i->second;
-}
-
-Cluster::Connections Cluster::getConnections(Lock&) {
-    Connections result(connections.size());
-    std::transform(connections.begin(), connections.end(), result.begin(),
-                   boost::bind(&ConnectionMap::value_type::second, _1));
-    return result;
+boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& connectionId)
 {
+    boost::intrusive_ptr<Connection> cp = connections.find(connectionId);
+    if (!cp && connectionId.getMember() != myId) { // New shadow connection
+        std::ostringstream mgmtId;
+        mgmtId << name << ":"  << connectionId;
+        cp = new Connection(*this, shadowOut, mgmtId.str(), connectionId);
+        connections.insert(connectionId, cp);
+    }
+    return cp;
 }
 
 void Cluster::deliver(          
@@ -299,7 +281,7 @@
             QPID_LOG(trace, *this << " DROP: " << e);
         }
         else {
-            boost::intrusive_ptr<Connection> connection = getConnection(e.getConnectionId(),
l);
+            boost::intrusive_ptr<Connection> connection = getConnection(e.getConnectionId());
             if (!connection) return;
             if (e.getType() == CONTROL) {              
                 while (frame.decode(buf)) {
@@ -479,7 +461,7 @@
 
 // FIXME aconway 2008-10-15: no longer need a separate control now
 // that the dump control is in the deliver queue.
-void Cluster::dumpStart(const MemberId& , uint64_t dumpeeInt, const std::string&
urlStr, Lock& l) {
+void Cluster::dumpStart(const MemberId& , uint64_t dumpeeInt, const std::string&
urlStr, Lock&) {
     if (state == LEFT) return;
     MemberId dumpee(dumpeeInt);
     Url url(urlStr);
@@ -489,7 +471,7 @@
     deliverQueue.stop();
     if (dumpThread.id()) dumpThread.join(); // Join the previous dumpthread.
     dumpThread = Thread(
-        new DumpClient(myId, dumpee, url, broker, map, getConnections(l), 
+        new DumpClient(myId, dumpee, url, broker, map, connections.values(),
                        boost::bind(&Cluster::dumpOutDone, this),
                        boost::bind(&Cluster::dumpOutError, this, _1)));
 }
@@ -587,16 +569,8 @@
         mgmtObject->set_members(urlstr);
     }
 
-    //close connections belonging to members that have now been excluded
-    for (ConnectionMap::iterator i = connections.begin(); i != connections.end();) {
-        MemberId member = i->first.getMember();
-        if (member != myId && !map.isMember(member)) { 
-            i->second->left();
-            connections.erase(i++);
-        } else {
-            i++;
-        }
-    }
+    // Close connections belonging to members that have now been excluded
+    connections.update(myId, map);
 }
 
 std::ostream& operator<<(std::ostream& o, const Cluster& cluster) {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=724371&r1=724370&r2=724371&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Mon Dec  8 06:57:05 2008
@@ -23,6 +23,7 @@
 #include "Event.h"
 #include "NoOpConnectionOutputHandler.h"
 #include "ClusterMap.h"
+#include "ConnectionMap.h"
 #include "FailoverExchange.h"
 #include "Quorum.h"
 
@@ -72,7 +73,7 @@
     virtual ~Cluster();
 
     // Connection map
-    bool insert(const ConnectionPtr&); 
+    void insert(const ConnectionPtr&); 
     void erase(ConnectionId);       
     
     // Send to the cluster 
@@ -101,7 +102,6 @@
     typedef sys::LockPtr<const Cluster,sys::Monitor> ConstLockPtr;
     typedef sys::Monitor::ScopedLock Lock;
 
-    typedef std::map<ConnectionId, boost::intrusive_ptr<cluster::Connection> >
ConnectionMap;
     typedef sys::PollableQueue<Event> PollableEventQueue;
     typedef std::deque<Event> PlainEventQueue;
 
@@ -160,8 +160,7 @@
         struct cpg_address */*joined*/, int /*nJoined*/
     );
 
-    boost::intrusive_ptr<cluster::Connection> getConnection(const ConnectionId&,
Lock&);
-    Connections getConnections(Lock&); 
+    boost::intrusive_ptr<cluster::Connection> getConnection(const ConnectionId&);
 
     virtual qpid::management::ManagementObject* GetManagementObject() const;
     virtual management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args&
args, std::string& text);

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.h?rev=724371&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.h Mon Dec  8 06:57:05 2008
@@ -0,0 +1,90 @@
+#ifndef QPID_CLUSTER_CONNECTIONMAP_H
+#define QPID_CLUSTER_CONNECTIONMAP_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "types.h"
+#include "Connection.h"
+#include "ClusterMap.h"
+#include "qpid/sys/Mutex.h"
+#include <boost/intrusive_ptr.hpp>
+#include <map>
+
+namespace qpid {
+namespace cluster {
+
+/**
+ * Thread safe map of connections.
+ */
+class ConnectionMap
+{
+  public:
+    typedef boost::intrusive_ptr<cluster::Connection> ConnectionPtr;
+    typedef std::vector<ConnectionPtr> Vector;
+    
+    void insert(ConnectionId id, ConnectionPtr p) {
+        ScopedLock l(lock);
+        map.insert(Map::value_type(id,p));
+    }
+
+    void erase(ConnectionId id) {
+        ScopedLock l(lock);
+        map.erase(id);
+    }
+
+    ConnectionPtr find(ConnectionId id) const {
+        ScopedLock l(lock);
+        Map::const_iterator i = map.find(id);
+        return i == map.end() ? ConnectionPtr() : i->second;
+    }
+
+    Vector values() const {
+        Vector result(map.size());
+        std::transform(map.begin(), map.end(), result.begin(),
+                       boost::bind(&Map::value_type::second, _1));
+        return result;
+    }
+
+    void update(MemberId myId, const ClusterMap& cluster) {
+        for (Map::iterator i = map.begin(); i != map.end(); ) {
+            MemberId member = i->first.getMember();
+            if (member != myId && !cluster.isMember(member)) { 
+                i->second->left();
+                map.erase(i++);
+            } else {
+                i++;
+            }
+        }
+    }
+
+    size_t size() const { return map.size(); }
+  private:
+    typedef std::map<ConnectionId, ConnectionPtr> Map;
+    typedef sys::Mutex::ScopedLock ScopedLock;
+    
+    mutable sys::Mutex lock;
+    Map map;
+};
+
+
+}} // namespace qpid::cluster
+
+#endif  /*!QPID_CLUSTER_CONNECTIONMAP_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date



Mime
View raw message