qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From astitc...@apache.org
Subject svn commit: r1494639 - in /qpid/trunk/qpid/cpp/src: ./ qpid/broker/ qpid/broker/windows/ qpid/ha/ qpid/management/ qpid/sys/
Date Wed, 19 Jun 2013 14:26:04 GMT
Author: astitcher
Date: Wed Jun 19 14:26:03 2013
New Revision: 1494639

URL: http://svn.apache.org/r1494639
Log:
QPID-4905: Tidy up broker::Connection
- Clean up code for accounting for sent frames
- merged Connection and ConnectionState into Connection

Removed:
    qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h
    qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h
Modified:
    qpid/trunk/qpid/cpp/src/CMakeLists.txt
    qpid/trunk/qpid/cpp/src/Makefile.am
    qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h
    qpid/trunk/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
    qpid/trunk/qpid/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
    qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
    qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h

Modified: qpid/trunk/qpid/cpp/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/CMakeLists.txt?rev=1494639&r1=1494638&r2=1494639&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/CMakeLists.txt (original)
+++ qpid/trunk/qpid/cpp/src/CMakeLists.txt Wed Jun 19 14:26:03 2013
@@ -1195,7 +1195,6 @@ set (qpidbroker_SOURCES
      qpid/broker/Bridge.cpp
      qpid/broker/Connection.cpp
      qpid/broker/ConnectionHandler.cpp
-     qpid/broker/ConnectionState.cpp
      qpid/broker/DeliverableMessage.cpp
      qpid/broker/DeliveryRecord.cpp
      qpid/broker/DirectExchange.cpp

Modified: qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=1494639&r1=1494638&r2=1494639&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/Makefile.am Wed Jun 19 14:26:03 2013
@@ -482,7 +482,6 @@ libqpidcommon_la_SOURCES +=			\
   qpid/sys/ConnectionInputHandler.h		\
   qpid/sys/ConnectionInputHandlerFactory.h	\
   qpid/sys/ConnectionOutputHandler.h		\
-  qpid/sys/ConnectionOutputHandlerPtr.h		\
   qpid/sys/CopyOnWriteArray.h			\
   qpid/sys/DeletionManager.h			\
   qpid/sys/DispatchHandle.cpp			\
@@ -592,8 +591,6 @@ libqpidbroker_la_SOURCES = \
   qpid/broker/Connection.h \
   qpid/broker/ConnectionHandler.cpp \
   qpid/broker/ConnectionHandler.h \
-  qpid/broker/ConnectionState.cpp \
-  qpid/broker/ConnectionState.h \
   qpid/broker/ConnectionToken.h \
   qpid/broker/Consumer.h \
   qpid/broker/Credit.h \

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp?rev=1494639&r1=1494638&r2=1494639&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp Wed Jun 19 14:26:03 2013
@@ -22,7 +22,6 @@
 
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/FedOps.h"
-#include "qpid/broker/ConnectionState.h"
 #include "qpid/broker/Connection.h"
 #include "qpid/broker/Link.h"
 #include "qpid/broker/LinkRegistry.h"
@@ -73,7 +72,7 @@ Bridge::Bridge(const std::string& _name,
     queueName(_queueName.empty() ? "qpid.bridge_queue_" + name + "_" + link->getBroker()->getFederationTag()
               : _queueName),
     altEx(ae), persistenceId(0),
-    connState(0), conn(0), initialize(init), detached(false),
+    conn(0), initialize(init), detached(false),
     useExistingQueue(!_queueName.empty()),
     sessionName("qpid.bridge_session_" + name + "_" + link->getBroker()->getFederationTag())
 {
@@ -104,7 +103,6 @@ Bridge::~Bridge()
 void Bridge::create(Connection& c)
 {
     detached = false;           // Reset detached in case we are recovering.
-    connState = &c;
     conn = &c;
 
     SessionHandler& sessionHandler = c.getChannel(channel);
@@ -363,7 +361,7 @@ void Bridge::propagateBinding(const stri
                               qpid::framing::FieldTable* extra_args)
 {
     const string& localTag = link->getBroker()->getFederationTag();
-    const string& peerTag  = connState->getFederationPeerTag();
+    const string& peerTag  = conn->getFederationPeerTag();
 
     if (tagList.find(peerTag) == tagList.npos) {
          FieldTable bindArgs;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h?rev=1494639&r1=1494638&r2=1494639&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h Wed Jun 19 14:26:03 2013
@@ -41,7 +41,6 @@ namespace qpid {
 namespace broker {
 
 class Connection;
-class ConnectionState;
 class Link;
 class LinkRegistry;
 
@@ -135,7 +134,6 @@ class Bridge : public PersistableConfig,
     std::string queueName;
     std::string altEx;
     mutable uint64_t  persistenceId;
-    ConnectionState* connState;
     Connection* conn;
     InitializeCallback initialize;
     bool detached;              // Set when session is detached.

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1494639&r1=1494638&r2=1494639&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Wed Jun 19 14:26:03 2013
@@ -22,7 +22,7 @@
 #include "qpid/broker/Broker.h"
 
 #include "qpid/broker/AclModule.h"
-#include "qpid/broker/ConnectionState.h"
+#include "qpid/broker/Connection.h"
 #include "qpid/broker/DirectExchange.h"
 #include "qpid/broker/FanOutExchange.h"
 #include "qpid/broker/HeadersExchange.h"
@@ -77,7 +77,6 @@
 #include "qpid/sys/Thread.h"
 #include "qpid/sys/Time.h"
 #include "qpid/sys/Timer.h"
-#include "qpid/sys/ConnectionInputHandler.h"
 #include "qpid/sys/ConnectionInputHandlerFactory.h"
 #include "qpid/sys/SystemInfo.h"
 #include "qpid/Address.h"
@@ -708,7 +707,7 @@ struct InvalidParameter : public qpid::E
 };
 
 void Broker::createObject(const std::string& type, const std::string& name,
-                          const Variant::Map& properties, bool /*strict*/, const ConnectionState*
context)
+                          const Variant::Map& properties, bool /*strict*/, const Connection*
context)
 {
     std::string userId;
     std::string connectionId;
@@ -899,7 +898,7 @@ void Broker::createObject(const std::str
 }
 
 void Broker::deleteObject(const std::string& type, const std::string& name,
-                          const Variant::Map& options, const ConnectionState* context)
+                          const Variant::Map& options, const Connection* context)
 {
     std::string userId;
     std::string connectionId;
@@ -953,7 +952,7 @@ void Broker::checkDeleteQueue(Queue::sha
 Manageable::status_t Broker::queryObject(const std::string& type,
                                          const std::string& name,
                                          Variant::Map& results,
-                                         const ConnectionState* context)
+                                         const Connection* context)
 {
     std::string userId;
     std::string connectionId;
@@ -995,7 +994,7 @@ Manageable::status_t Broker::queryQueue(
 }
 
 Manageable::status_t Broker::getTimestampConfig(bool& receive,
-                                                const ConnectionState* context)
+                                                const Connection* context)
 {
     std::string name;   // none needed for broker
     std::string userId = context->getUserId();
@@ -1007,7 +1006,7 @@ Manageable::status_t Broker::getTimestam
 }
 
 Manageable::status_t Broker::setTimestampConfig(const bool receive,
-                                                const ConnectionState* context)
+                                                const Connection* context)
 {
     std::string name;   // none needed for broker
     std::string userId = context->getUserId();

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=1494639&r1=1494638&r2=1494639&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Wed Jun 19 14:26:03 2013
@@ -63,7 +63,7 @@ struct Url;
 namespace broker {
 
 class AclModule;
-class ConnectionState;
+class Connection;
 class ExpiryPolicy;
 class Message;
 struct QueueSettings;
@@ -151,20 +151,20 @@ class Broker : public sys::Runnable, pub
     void setLogHiresTimestamp(bool enabled);
     bool getLogHiresTimestamp();
     void createObject(const std::string& type, const std::string& name,
-                      const qpid::types::Variant::Map& properties, bool strict, const
ConnectionState* context);
+                      const qpid::types::Variant::Map& properties, bool strict, const
Connection* context);
     void deleteObject(const std::string& type, const std::string& name,
-                      const qpid::types::Variant::Map& options, const ConnectionState*
context);
+                      const qpid::types::Variant::Map& options, const Connection* context);
     void checkDeleteQueue(boost::shared_ptr<Queue> queue, bool ifUnused, bool ifEmpty);
     Manageable::status_t queryObject(const std::string& type, const std::string&
name,
-                                     qpid::types::Variant::Map& results, const ConnectionState*
context);
+                                     qpid::types::Variant::Map& results, const Connection*
context);
     Manageable::status_t queryQueue( const std::string& name,
                                      const std::string& userId,
                                      const std::string& connectionId,
                                      qpid::types::Variant::Map& results);
     Manageable::status_t getTimestampConfig(bool& receive,
-                                            const ConnectionState* context);
+                                            const Connection* context);
     Manageable::status_t setTimestampConfig(const bool receive,
-                                            const ConnectionState* context);
+                                            const Connection* context);
     Manageable::status_t queueRedirect(const std::string& srcQueue, const std::string&
tgtQueue);
     void queueRedirectDestroy(boost::shared_ptr<Queue> srcQ, boost::shared_ptr<Queue>
tgtQ, bool moveMsgs);
     boost::shared_ptr<sys::Poller> poller;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=1494639&r1=1494638&r2=1494639&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Wed Jun 19 14:26:03 2013
@@ -19,6 +19,7 @@
  *
  */
 #include "qpid/broker/Connection.h"
+
 #include "qpid/broker/ConnectionObserver.h"
 #include "qpid/broker/SessionOutputException.h"
 #include "qpid/broker/SessionState.h"
@@ -26,6 +27,7 @@
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/management/ManagementAgent.h"
+#include "qpid/sys/ConnectionOutputHandler.h"
 #include "qpid/sys/SecuritySettings.h"
 #include "qpid/sys/Timer.h"
 
@@ -80,6 +82,47 @@ struct ConnectionTimeoutTask : public sy
         connection.abort();
     }
 };
+/**
+ * A ConnectionOutputHandler that delegates to another
+ * ConnectionOutputHandler.  Allows you to inspect outputting frames
+ */
+class FrameInspector : public sys::ConnectionOutputHandler
+{
+public:
+    FrameInspector(ConnectionOutputHandler* p, framing::FrameHandler* i) :
+        next(p),
+        intercepter(i)
+    {
+        assert(next);
+        assert(intercepter);
+    }
+
+    void close() { next->close(); }
+    void abort() { next->abort(); }
+    void connectionEstablished() { next->connectionEstablished(); }
+    void activateOutput() { next->activateOutput(); }
+    void handle(framing::AMQFrame& f) { intercepter->handle(f); next->handle(f);
}
+
+private:
+    ConnectionOutputHandler* next;
+    framing::FrameHandler* intercepter;
+};
+
+/**
+ * Chained ConnectionOutputHandler that allows outgoing frames to be
+ * tracked (for updating mgmt stats).
+ */
+class OutboundFrameTracker : public framing::FrameHandler
+{
+public:
+    OutboundFrameTracker(Connection& _con) : con(_con) {}
+    void handle(framing::AMQFrame& f)
+    {
+        con.sent(f);
+    }
+private:
+    Connection& con;
+};
 
 Connection::Connection(ConnectionOutputHandler* out_,
                        Broker& broker_, const
@@ -88,19 +131,24 @@ Connection::Connection(ConnectionOutputH
                        bool link_,
                        uint64_t objectId_
 ) :
-    ConnectionState(out_, broker_),
+    outboundTracker(new OutboundFrameTracker(*this)),
+    out(new FrameInspector(out_, outboundTracker.get())),
+    broker(broker_),
+    framemax(65535),
+    heartbeat(0),
+    heartbeatmax(120),
+    userProxyAuth(false), // Can proxy msgs with non-matching auth ids when true (used by
federation links)
+    isDefaultRealm(false),
     securitySettings(external),
-    adapter(*this, link_),
     link(link_),
+    adapter(*this, link),
     mgmtClosing(false),
     mgmtId(mgmtId_),
     links(broker_.getLinks()),
     agent(0),
     timer(broker_.getTimer()),
-    objectId(objectId_),
-    outboundTracker(*this)
+    objectId(objectId_)
 {
-    outboundTracker.wrap(out);
     broker.getConnectionObservers().connection(*this);
     assert(agent == 0);
     assert(mgmtObject == 0);
@@ -112,7 +160,7 @@ Connection::Connection(ConnectionOutputH
             mgmtObject = _qmf::Connection::shared_ptr(new _qmf::Connection(agent, this, parent,
mgmtId, !link, false, "AMQP 0-10"));
             agent->addObject(mgmtObject, objectId);
         }
-        ConnectionState::setUrl(mgmtId);
+        setUrl(mgmtId);
     }
 }
 
@@ -120,15 +168,15 @@ void Connection::requestIOProcessing(boo
 {
     ScopedLock<Mutex> l(ioCallbackLock);
     ioCallbacks.push(callback);
-    if (isOpen()) out.activateOutput();
+    if (isOpen()) out->activateOutput();
 }
 
 Connection::~Connection()
 {
     if (mgmtObject != 0) {
         if (!link)
-            agent->raiseEvent(_qmf::EventClientDisconnect(mgmtId, ConnectionState::getUserId(),
mgmtObject->get_remoteProperties()));
-        QPID_LOG_CAT(debug, model, "Delete connection. user:" << ConnectionState::getUserId()
+            agent->raiseEvent(_qmf::EventClientDisconnect(mgmtId, getUserId(), mgmtObject->get_remoteProperties()));
+        QPID_LOG_CAT(debug, model, "Delete connection. user:" << getUserId()
             << " rhost:" << mgmtId );
         mgmtObject->resourceDestroy();
     }
@@ -260,14 +308,19 @@ void Connection::notifyConnectionForced(
     broker.getConnectionObservers().forced(*this, text);
 }
 
-void Connection::setUserId(const string& userId)
+void Connection::setUserId(const string& uid)
 {
-    ConnectionState::setUserId(userId);
+    userId = uid;
+    size_t at = userId.find('@');
+    userName = userId.substr(0, at);
+    isDefaultRealm = (
+        at!= std::string::npos &&
+        getBroker().getOptions().realm == userId.substr(at+1,userId.size()));
 }
 
 void Connection::setUserProxyAuth(bool b)
 {
-    ConnectionState::setUserProxyAuth(b);
+    userProxyAuth = b;
     if (mgmtObject != 0)
         mgmtObject->set_userProxyAuth(b);
 }
@@ -286,7 +339,22 @@ void Connection::close(connection::Close
     //make sure we delete dangling pointers from outputTasks before deleting sessions
     outputTasks.removeAll();
     channels.clear();
-    getOutput().close();
+    out->close();
+}
+
+void Connection::activateOutput()
+{
+    out->activateOutput();
+}
+
+void Connection::addOutputTask(OutputTask* t)
+{
+    outputTasks.addOutputTask(t);
+}
+
+void Connection::removeOutputTask(OutputTask* t)
+{
+    outputTasks.removeOutputTask(t);
 }
 
 void Connection::closed(){ // Physically closed, suspend open sessions.
@@ -371,7 +439,7 @@ Manageable::status_t Connection::Managem
     case _qmf::Connection::METHOD_CLOSE :
         mgmtClosing = true;
         if (mgmtObject != 0) mgmtObject->set_closing(1);
-        out.activateOutput();
+        out->activateOutput();
         status = Manageable::STATUS_OK;
         break;
     }
@@ -435,7 +503,7 @@ void Connection::abort()
     if (heartbeatTimer)
         heartbeatTimer->cancel();
 
-    out.abort();
+    out->abort();
 }
 
 void Connection::setHeartbeatInterval(uint16_t heartbeat)
@@ -451,7 +519,7 @@ void Connection::setHeartbeatInterval(ui
             timer.add(timeoutTimer);
         }
     }
-    out.connectionEstablished();
+    out->connectionEstablished();
 }
 
 void Connection::startLinkHeartbeatTimeoutTask() {
@@ -459,7 +527,7 @@ void Connection::startLinkHeartbeatTimeo
         linkHeartbeatTimer = new LinkHeartbeatTask(timer, 2 * heartbeat * TIME_SEC, *this);
         timer.add(linkHeartbeatTimer);
     }
-    out.connectionEstablished();
+    out->connectionEstablished();
 }
 
 void Connection::restartTimeout()
@@ -474,20 +542,4 @@ void Connection::restartTimeout()
 
 bool Connection::isOpen() { return adapter.isOpen(); }
 
-Connection::OutboundFrameTracker::OutboundFrameTracker(Connection& _con) : con(_con),
next(0) {}
-void Connection::OutboundFrameTracker::close() { next->close(); }
-void Connection::OutboundFrameTracker::abort() { next->abort(); }
-void Connection::OutboundFrameTracker::connectionEstablished() { next->connectionEstablished();
}
-void Connection::OutboundFrameTracker::activateOutput() { next->activateOutput(); }
-void Connection::OutboundFrameTracker::handle(framing::AMQFrame& f)
-{
-    next->handle(f);
-    con.sent(f);
-}
-void Connection::OutboundFrameTracker::wrap(sys::ConnectionOutputHandlerPtr& p)
-{
-    next = p.get();
-    p.set(this);
-}
-
 }}

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=1494639&r1=1494638&r2=1494639&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Wed Jun 19 14:26:03 2013
@@ -28,23 +28,29 @@
 #include <queue>
 
 #include "qpid/broker/BrokerImportExport.h"
+
 #include "qpid/broker/ConnectionHandler.h"
-#include "qpid/broker/ConnectionState.h"
+#include "qpid/broker/ConnectionToken.h"
+#include "qpid/management/Manageable.h"
+#include "qpid/sys/AggregateOutput.h"
 #include "qpid/sys/ConnectionInputHandler.h"
 #include "qpid/sys/SecuritySettings.h"
 #include "qpid/sys/Mutex.h"
 #include "qpid/RefCounted.h"
+#include "qpid/Url.h"
 #include "qpid/ptr_map.h"
 
 #include "qmf/org/apache/qpid/broker/Connection.h"
 
 #include <boost/ptr_container/ptr_map.hpp>
+#include <boost/scoped_ptr.hpp>
 #include <boost/bind.hpp>
 
 #include <algorithm>
 
 namespace qpid {
 namespace sys {
+class ConnectionOutputHandler;
 class Timer;
 class TimerTask;
 }
@@ -58,10 +64,45 @@ class SessionHandler;
 struct ConnectionTimeoutTask;
 
 class Connection : public sys::ConnectionInputHandler,
-                   public ConnectionState,
+                   public ConnectionToken, public management::Manageable,
                    public RefCounted
 {
   public:
+    uint32_t getFrameMax() const { return framemax; }
+    uint16_t getHeartbeat() const { return heartbeat; }
+    uint16_t getHeartbeatMax() const { return heartbeatmax; }
+
+    void setFrameMax(uint32_t fm) { framemax = std::max(fm, (uint32_t) 4096); }
+    void setHeartbeat(uint16_t hb) { heartbeat = hb; }
+    void setHeartbeatMax(uint16_t hbm) { heartbeatmax = hbm; }
+
+    const std::string& getUserId() const { return userId; }
+
+    void setUrl(const std::string& _url) { url = _url; }
+    const std::string& getUrl() const { return url; }
+
+    void setUserProxyAuth(const bool b);
+    bool isUserProxyAuth() const { return userProxyAuth || federationPeerTag.size() >
0; } // links can proxy msgs with non-matching auth ids
+    bool isFederationLink() const { return federationPeerTag.size() > 0; }
+    void setFederationPeerTag(const std::string& tag) { federationPeerTag = std::string(tag);
}
+    const std::string& getFederationPeerTag() const { return federationPeerTag; }
+    std::vector<Url>& getKnownHosts() { return knownHosts; }
+
+    /**@return true if user is the authenticated user on this connection.
+     * If id has the default realm will also compare plain username.
+     */
+    bool isAuthenticatedUser(const std::string& id) const {
+        return (id == userId || (isDefaultRealm && id == userName));
+    }
+
+    Broker& getBroker() { return broker; }
+
+    sys::ConnectionOutputHandler& getOutput() { return *out; }
+    void activateOutput();
+    void addOutputTask(OutputTask*);
+    void removeOutputTask(OutputTask*);
+    framing::ProtocolVersion getVersion() const { return version; }
+
     Connection(sys::ConnectionOutputHandler* out,
                Broker& broker,
                const std::string& mgmtId,
@@ -111,10 +152,8 @@ class Connection : public sys::Connectio
     void setUserId(const std::string& uid);
 
     // credentials for connected client
-    const std::string& getUserId() const { return ConnectionState::getUserId(); }
     const std::string& getMgmtId() const { return mgmtId; }
     management::ManagementAgent* getAgent() const { return agent; }
-    void setUserProxyAuth(bool b);
 
     void setHeartbeatInterval(uint16_t heartbeat);
     void sendHeartbeat();
@@ -137,18 +176,39 @@ class Connection : public sys::Connectio
     const framing::FieldTable& getClientProperties() const { return clientProperties;
}
 
   private:
+    // Management object is used in the constructor so must be early
+    qmf::org::apache::qpid::broker::Connection::shared_ptr mgmtObject;
+
+    //contained output tasks
+    sys::AggregateOutput outputTasks;
+
+    boost::scoped_ptr<framing::FrameHandler> outboundTracker;
+    boost::scoped_ptr<sys::ConnectionOutputHandler> out;
+
+    Broker& broker;
+
+    framing::ProtocolVersion version;
+    uint32_t framemax;
+    uint16_t heartbeat;
+    uint16_t heartbeatmax;
+    std::string userId;
+    std::string url;
+    bool userProxyAuth;
+    std::string federationPeerTag;
+    std::vector<Url> knownHosts;
+    std::string userName;
+    bool isDefaultRealm;
+
     typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap;
-    typedef std::vector<boost::shared_ptr<Queue> >::iterator queue_iterator;
 
     ChannelMap channels;
     qpid::sys::SecuritySettings securitySettings;
-    ConnectionHandler adapter;
     const bool link;
+    ConnectionHandler adapter;
     bool mgmtClosing;
     const std::string mgmtId;
     sys::Mutex ioCallbackLock;
     std::queue<boost::function0<void> > ioCallbacks;
-    qmf::org::apache::qpid::broker::Connection::shared_ptr mgmtObject;
     LinkRegistry& links;
     management::ManagementAgent* agent;
     sys::Timer& timer;
@@ -157,25 +217,7 @@ class Connection : public sys::Connectio
     uint64_t objectId;
     framing::FieldTable clientProperties;
 
-    /**
-     * Chained ConnectionOutputHandler that allows outgoing frames to be
-     * tracked (for updating mgmt stats).
-     */
-    class OutboundFrameTracker : public sys::ConnectionOutputHandler
-    {
-      public:
-        OutboundFrameTracker(Connection&);
-        void close();
-        void abort();
-        void connectionEstablished();
-        void activateOutput();
-        void handle(framing::AMQFrame&);
-        void wrap(sys::ConnectionOutputHandlerPtr&);
-      private:
-        Connection& con;
-        sys::ConnectionOutputHandler* next;
-    };
-    OutboundFrameTracker outboundTracker;
+friend class OutboundFrameTracker;
 
     void sent(const framing::AMQFrame& f);
     void doIoCallbacks();

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp?rev=1494639&r1=1494638&r2=1494639&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp Wed Jun 19 14:26:03 2013
@@ -33,6 +33,7 @@
 #include "qpid/framing/FieldValue.h"
 #include "qpid/log/Statement.h"
 #include "qpid/management/ManagementAgent.h"
+#include "qpid/sys/ConnectionOutputHandler.h"
 #include "qpid/sys/SecurityLayer.h"
 #include "qpid/sys/Time.h"
 #include "qpid/broker/AclModule.h"
@@ -254,7 +255,7 @@ void ConnectionHandler::Handler::tuneOk(
 void ConnectionHandler::Handler::open(const string& /*virtualHost*/,
                                       const framing::Array& /*capabilities*/, bool /*insist*/)
 {
-    std::vector<Url> urls = connection.broker.getKnownBrokers();
+    std::vector<Url> urls = connection.getBroker().getKnownBrokers();
     framing::Array array(0x95); // str16 array
     for (std::vector<Url>::iterator i = urls.begin(); i < urls.end(); ++i)
         array.add(boost::shared_ptr<Str16Value>(new Str16Value(i->str())));

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h?rev=1494639&r1=1494638&r2=1494639&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h Wed Jun 19 14:26:03 2013
@@ -21,7 +21,7 @@
 
 #include "qpid/broker/SemanticState.h"
 #include "qpid/broker/SessionContext.h"
-#include "qpid/broker/ConnectionState.h"
+#include "qpid/broker/Connection.h"
 
 namespace qpid {
 namespace broker {
@@ -40,7 +40,7 @@ class HandlerImpl {
     HandlerImpl(SemanticState& s) : state(s), session(s.getSession()) {}
 
     framing::AMQP_ClientProxy& getProxy() { return session.getProxy(); }
-    ConnectionState& getConnection() { return session.getConnection(); }
+    Connection& getConnection() { return session.getConnection(); }
     Broker& getBroker() { return session.getConnection().getBroker(); }
 };
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp?rev=1494639&r1=1494638&r2=1494639&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp Wed Jun 19 14:26:03 2013
@@ -22,9 +22,10 @@
 #include "qpid/broker/AclModule.h"
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/Connection.h"
-#include "qpid/log/Statement.h"
 #include "qpid/framing/reply_exceptions.h"
 #include "qpid/framing/FieldValue.h"
+#include "qpid/log/Statement.h"
+#include "qpid/sys/ConnectionOutputHandler.h"
 #include "qpid/sys/SecuritySettings.h"
 
 #include <boost/format.hpp>

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1494639&r1=1494638&r2=1494639&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Wed Jun 19 14:26:03 2013
@@ -42,6 +42,7 @@
 #include "qpid/ptr_map.h"
 #include "qpid/broker/AclModule.h"
 #include "qpid/broker/FedOps.h"
+#include "qpid/sys/ConnectionOutputHandler.h"
 
 #include <boost/bind.hpp>
 #include <boost/format.hpp>
@@ -439,7 +440,7 @@ void SemanticState::disable(ConsumerImpl
 {
     c->disableNotify();
     if (session.isAttached())
-        session.getConnection().outputTasks.removeOutputTask(c.get());
+        session.getConnection().removeOutputTask(c.get());
 }
 
 void SemanticState::cancel(ConsumerImpl::shared_ptr c)
@@ -505,8 +506,8 @@ void SemanticState::requestDispatch()
 void SemanticStateConsumerImpl::requestDispatch()
 {
     if (blocked) {
-        parent->session.getConnection().outputTasks.addOutputTask(this);
-        parent->session.getConnection().getOutput().activateOutput();
+        parent->session.getConnection().addOutputTask(this);
+        parent->session.getConnection().activateOutput();
         blocked = false;
     }
 }
@@ -735,8 +736,8 @@ void SemanticStateConsumerImpl::notify()
 {
     Mutex::ScopedLock l(lock);
     if (notifyEnabled) {
-        parent->session.getConnection().outputTasks.addOutputTask(this);
-        parent->session.getConnection().getOutput().activateOutput();
+        parent->session.getConnection().addOutputTask(this);
+        parent->session.getConnection().activateOutput();
     }
 }
 
@@ -804,16 +805,16 @@ void SemanticState::attached()
 {
     for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
         i->second->enableNotify();
-        session.getConnection().outputTasks.addOutputTask(i->second.get());
+        session.getConnection().addOutputTask(i->second.get());
     }
-    session.getConnection().getOutput().activateOutput();
+    session.getConnection().activateOutput();
 }
 
 void SemanticState::detached()
 {
     for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
         i->second->disableNotify();
-        session.getConnection().outputTasks.removeOutputTask(i->second.get());
+        session.getConnection().removeOutputTask(i->second.get());
     }
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h?rev=1494639&r1=1494638&r2=1494639&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h Wed Jun 19 14:26:03 2013
@@ -36,7 +36,7 @@ class AMQP_ClientProxy;
 namespace broker {
 
 class Broker;
-class ConnectionState;
+class Connection;
 
 class SessionContext : public OwnershipToken
 {
@@ -44,7 +44,7 @@ class SessionContext : public OwnershipT
     virtual ~SessionContext(){}
     virtual bool isLocal(const ConnectionToken* t) const = 0;
     virtual bool isAttached() const = 0;
-    virtual ConnectionState& getConnection() = 0;
+    virtual Connection& getConnection() = 0;
     virtual framing::AMQP_ClientProxy& getProxy() = 0;
     virtual Broker& getBroker() = 0;
     virtual uint16_t getChannel() const = 0;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp?rev=1494639&r1=1494638&r2=1494639&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp Wed Jun 19 14:26:03 2013
@@ -23,6 +23,7 @@
 #include "qpid/broker/Connection.h"
 #include "qpid/broker/SessionState.h"
 #include "qpid/log/Statement.h"
+#include "qpid/sys/ConnectionOutputHandler.h"
 
 #include <boost/bind.hpp>
 
@@ -63,9 +64,9 @@ void SessionHandler::executionException(
         errorListener->executionException(code, msg);
 }
 
-ConnectionState& SessionHandler::getConnection() { return connection; }
+Connection& SessionHandler::getConnection() { return connection; }
 
-const ConnectionState& SessionHandler::getConnection() const { return connection; }
+const Connection& SessionHandler::getConnection() const { return connection; }
 
 void SessionHandler::handleDetach() {
     qpid::amqp_0_10::SessionHandler::handleDetach();
@@ -80,7 +81,7 @@ void SessionHandler::handleDetach() {
 void SessionHandler::setState(const std::string& name, bool force) {
     assert(!session.get());
     SessionId id(connection.getUserId(), name);
-    session = connection.broker.getSessionManager().attach(*this, id, force);
+    session = connection.getBroker().getSessionManager().attach(*this, id, force);
 }
 
 void SessionHandler::detaching()
@@ -102,7 +103,7 @@ void SessionHandler::readyToSend() {
 void SessionHandler::attachAs(const std::string& name)
 {
     SessionId id(connection.getUserId(), name);
-    SessionState::Configuration config = connection.broker.getSessionManager().getSessionConfig();
+    SessionState::Configuration config = connection.getBroker().getSessionManager().getSessionConfig();
     session.reset(new SessionState(connection.getBroker(), *this, id, config));
     sendAttach(false);
 }
@@ -118,7 +119,7 @@ void SessionHandler::attached(const std:
         qpid::amqp_0_10::SessionHandler::attached(name);
     } else {
         SessionId id(connection.getUserId(), name);
-        SessionState::Configuration config = connection.broker.getSessionManager().getSessionConfig();
+        SessionState::Configuration config = connection.getBroker().getSessionManager().getSessionConfig();
         session.reset(new SessionState(connection.getBroker(), *this, id, config));
         markReadyToSend();
     }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h?rev=1494639&r1=1494638&r2=1494639&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h Wed Jun 19 14:26:03 2013
@@ -33,7 +33,6 @@ class SessionState;
 namespace broker {
 
 class Connection;
-class ConnectionState;
 class SessionState;
 
 /**
@@ -65,8 +64,8 @@ class SessionHandler : public qpid::amqp
     SessionState* getSession() { return session.get(); }
     const SessionState* getSession() const { return session.get(); }
 
-    ConnectionState& getConnection();
-    const ConnectionState& getConnection() const;
+    Connection& getConnection();
+    const Connection& getConnection() const;
 
     framing::AMQP_ClientProxy& getProxy() { return proxy; }
     const framing::AMQP_ClientProxy& getProxy() const { return proxy; }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=1494639&r1=1494638&r2=1494639&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Wed Jun 19 14:26:03 2013
@@ -20,7 +20,6 @@
  */
 #include "qpid/broker/SessionState.h"
 #include "qpid/broker/Broker.h"
-#include "qpid/broker/ConnectionState.h"
 #include "qpid/broker/DeliverableMessage.h"
 #include "qpid/broker/DeliveryRecord.h"
 #include "qpid/broker/SessionManager.h"
@@ -97,7 +96,7 @@ uint16_t SessionState::getChannel() cons
     return handler->getChannel();
 }
 
-ConnectionState& SessionState::getConnection() {
+Connection& SessionState::getConnection() {
     assert(isAttached());
     return handler->getConnection();
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=1494639&r1=1494638&r2=1494639&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Wed Jun 19 14:26:03 2013
@@ -89,7 +89,7 @@ class SessionState : public qpid::Sessio
     uint16_t getChannel() const;
 
     /** @pre isAttached() */
-    ConnectionState& getConnection();
+    Connection& getConnection();
     bool isLocal(const ConnectionToken* t) const;
 
     Broker& getBroker();

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp?rev=1494639&r1=1494638&r2=1494639&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp Wed Jun 19 14:26:03
2013
@@ -27,6 +27,7 @@
 #include "qpid/log/Statement.h"
 #include "qpid/framing/reply_exceptions.h"
 #include "qpid/framing/FieldValue.h"
+#include "qpid/sys/ConnectionOutputHandler.h"
 
 #include <windows.h>
 

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1494639&r1=1494638&r2=1494639&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp Wed Jun 19 14:26:03 2013
@@ -31,7 +31,6 @@
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/QueueObserver.h"
 #include "qpid/broker/SessionContext.h"
-#include "qpid/broker/ConnectionState.h"
 #include "qpid/broker/amqp_0_10/MessageTransfer.h"
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/framing/MessageTransferBody.h"

Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=1494639&r1=1494638&r2=1494639&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp Wed Jun 19 14:26:03 2013
@@ -37,7 +37,7 @@
 #include "qpid/sys/Timer.h"
 #include "qpid/sys/Thread.h"
 #include "qpid/sys/PollableQueue.h"
-#include "qpid/broker/ConnectionState.h"
+#include "qpid/broker/Connection.h"
 #include "qpid/broker/AclModule.h"
 #include "qpid/types/Variant.h"
 #include "qpid/types/Uuid.h"
@@ -88,7 +88,7 @@ const string keyifyNameStr(const string&
 
 struct ScopedManagementContext
 {
-    ScopedManagementContext(const qpid::broker::ConnectionState* context)
+    ScopedManagementContext(const qpid::broker::Connection* context)
     {
         setManagementExecutionContext(context);
     }
@@ -1286,7 +1286,7 @@ void ManagementAgent::handleMethodReques
         return;
     }
 
-    string userId = ((const qpid::broker::ConnectionState*) connToken)->getUserId();
+    string userId = ((const qpid::broker::Connection*) connToken)->getUserId();
     if (acl != 0) {
         map<acl::Property, string> params;
         params[acl::PROP_SCHEMAPACKAGE] = packageName;
@@ -1407,7 +1407,7 @@ void ManagementAgent::handleMethodReques
         return;
     }
 
-    string userId = ((const qpid::broker::ConnectionState*) connToken)->getUserId();
+    string userId = ((const qpid::broker::Connection*) connToken)->getUserId();
     if (acl != 0) {
         map<acl::Property, string> params;
         params[acl::PROP_SCHEMAPACKAGE] = object->getPackageName();
@@ -1723,7 +1723,7 @@ void ManagementAgent::handleAttachReques
     string   label;
     uint32_t requestedBrokerBank, requestedAgentBank;
     uint32_t assignedBank;
-    ObjectId connectionRef = ((const ConnectionState*) connToken)->GetManagementObject()->getObjectId();
+    ObjectId connectionRef = ((const Connection*) connToken)->GetManagementObject()->getObjectId();
     Uuid     systemId;
 
     moveNewObjects();
@@ -2206,7 +2206,7 @@ bool ManagementAgent::authorizeAgentMess
         if (acl == 0)
             return true;
 
-        string  userId = ((const qpid::broker::ConnectionState*) msg.getPublisher())->getUserId();
+        string  userId = ((const qpid::broker::Connection*) msg.getPublisher())->getUserId();
         params[acl::PROP_SCHEMAPACKAGE] = packageName;
         params[acl::PROP_SCHEMACLASS]   = className;
 
@@ -2276,7 +2276,7 @@ void ManagementAgent::dispatchAgentComma
     uint32_t bufferLen = inBuffer.getPosition();
     inBuffer.reset();
 
-    ScopedManagementContext context((const qpid::broker::ConnectionState*) msg.getPublisher());
+    ScopedManagementContext context((const qpid::broker::Connection*) msg.getPublisher());
     const framing::FieldTable *headers = p ? &p->getApplicationHeaders() : 0;
     if (headers && p->getAppId() == "qmf2")
     {
@@ -2755,14 +2755,14 @@ ManagementAgent::EventQueue::Batch::cons
 }
 
 namespace {
-QPID_TSS const qpid::broker::ConnectionState* executionContext = 0;
+QPID_TSS const qpid::broker::Connection* executionContext = 0;
 }
 
-void setManagementExecutionContext(const qpid::broker::ConnectionState* ctxt)
+void setManagementExecutionContext(const qpid::broker::Connection* ctxt)
 {
     executionContext = ctxt;
 }
-const qpid::broker::ConnectionState* getManagementExecutionContext()
+const qpid::broker::Connection* getManagementExecutionContext()
 {
     return executionContext;
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h?rev=1494639&r1=1494638&r2=1494639&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h Wed Jun 19 14:26:03 2013
@@ -44,7 +44,7 @@
 
 namespace qpid {
 namespace broker {
-class ConnectionState;
+class Connection;
 }
 namespace sys {
 class Timer;
@@ -379,8 +379,8 @@ private:
     std::auto_ptr<EventQueue> sendQueue;
 };
 
-void setManagementExecutionContext(const qpid::broker::ConnectionState*);
-const qpid::broker::ConnectionState* getManagementExecutionContext();
+void setManagementExecutionContext(const qpid::broker::Connection*);
+const qpid::broker::Connection* getManagementExecutionContext();
 }}
 
 #endif  /*!_ManagementAgent_*/



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message