qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kgiu...@apache.org
Subject svn commit: r1333466 - in /qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker: Bridge.cpp Bridge.h Broker.cpp Link.cpp Link.h
Date Thu, 03 May 2012 14:12:54 GMT
Author: kgiusti
Date: Thu May  3 14:12:54 2012
New Revision: 1333466

URL: http://svn.apache.org/viewvc?rev=1333466&view=rev
Log:
QPID-3767: fix remote session and queue name to be unique

Modified:
    qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/Bridge.cpp
    qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/Bridge.h
    qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/Broker.cpp
    qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/Link.cpp
    qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/Link.h

Modified: qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/Bridge.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/Bridge.cpp?rev=1333466&r1=1333465&r2=1333466&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/Bridge.cpp (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/Bridge.cpp Thu May  3 14:12:54 2012
@@ -60,20 +60,18 @@ void Bridge::PushHandler::handle(framing
 Bridge::Bridge(const std::string& _name, Link* _link, framing::ChannelId _id,
                CancellationListener l, const _qmf::ArgsLinkBridge& _args,
                InitializeCallback init) :
-    link(_link), id(_id), args(_args), mgmtObject(0),
+    link(_link), channel(_id), args(_args), mgmtObject(0),
     listener(l), name(_name), queueName("qpid.bridge_queue_"), persistenceId(0),
     initialize(init), detached(false)
 {
-    std::stringstream title;
-    title << id << "_" << name;
-    queueName += title.str();
+    queueName += Uuid(true).str();
     ManagementAgent* agent = link->getBroker()->getManagementAgent();
     if (agent != 0) {
         mgmtObject = new _qmf::Bridge
             (agent, this, link, name, args.i_durable, args.i_src, args.i_dest,
              args.i_key, args.i_srcIsQueue, args.i_srcIsLocal,
              args.i_tag, args.i_excludes, args.i_dynamic, args.i_sync);
-        mgmtObject->set_channelId(id);
+        mgmtObject->set_channelId(channel);
         agent->addObject(mgmtObject);
     }
     QPID_LOG(debug, "Bridge " << name << " created from " << args.i_src
<< " to " << args.i_dest);
@@ -91,7 +89,7 @@ void Bridge::create(Connection& c)
     conn = &c;
     FieldTable options;
     if (args.i_sync) options.setInt("qpid.sync_frequency", args.i_sync);
-    SessionHandler& sessionHandler = c.getChannel(id);
+    SessionHandler& sessionHandler = c.getChannel(channel);
     sessionHandler.setDetachedCallback(
         boost::bind(&Bridge::sessionDetached, shared_from_this()));
     if (args.i_srcIsLocal) {
@@ -99,15 +97,15 @@ void Bridge::create(Connection& c)
             throw Exception("Dynamic routing not supported for push routes");
         // Point the bridging commands at the local connection handler
         pushHandler.reset(new PushHandler(&c));
-        channelHandler.reset(new framing::ChannelHandler(id, pushHandler.get()));
+        channelHandler.reset(new framing::ChannelHandler(channel, pushHandler.get()));
 
         session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler));
         peer.reset(new framing::AMQP_ServerProxy(*channelHandler));
 
-        session->attach(name, false);
+        session->attach(queueName, false);
         session->commandPoint(0,0);
     } else {
-        sessionHandler.attachAs(name);
+        sessionHandler.attachAs(queueName);
         // Point the bridging commands at the remote peer broker
         peer.reset(new framing::AMQP_ServerProxy(sessionHandler.out));
     }
@@ -217,12 +215,8 @@ Bridge::shared_ptr Bridge::decode(LinkRe
     Link::shared_ptr link;
     if (kind == ENCODED_IDENTIFIER_V1) {
         /** previous versions identified the bridge by host:port, not by name, and
-         * transport wasn't provided.  So create a unique name for the new bridge.
+         * transport wasn't provided.  Try to find a link using those paramters.
          */
-
-        framing::Uuid uuid(true);
-        name = QPID_NAME_PREFIX + uuid.str();
-
         buffer.getShortString(host);
         port = buffer.getShort();
 
@@ -254,6 +248,12 @@ Bridge::shared_ptr Bridge::decode(LinkRe
     bool dynamic(buffer.getOctet());
     uint16_t sync = buffer.getShort();
 
+    if (kind == ENCODED_IDENTIFIER_V1) {
+        /** previous versions did not provide a name for the bridge, so create one
+         */
+        name = createName(link->getName(), src, dest, key);
+    }
+
     return links.declare(name, *link, durable, src, dest, key, is_queue,
                          is_local, id, excludes, dynamic, sync).first;
 }
@@ -351,7 +351,7 @@ void Bridge::sendReorigin()
 }
 bool Bridge::resetProxy()
 {
-    SessionHandler& sessionHandler = conn->getChannel(id);
+    SessionHandler& sessionHandler = conn->getChannel(channel);
     if (!sessionHandler.getSession()) peer.reset();
     else peer.reset(new framing::AMQP_ServerProxy(sessionHandler.out));
     return peer.get();
@@ -381,4 +381,14 @@ void Bridge::sessionDetached() {
     detached = true;
 }
 
+std::string Bridge::createName(const std::string& linkName,
+                               const std::string& src,
+                               const std::string& dest,
+                               const std::string& key)
+{
+    std::stringstream keystream;
+    keystream << linkName << "!" << src << "!" << dest <<
"!" << key;
+    return keystream.str();
+}
+
 }}

Modified: qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/Bridge.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/Bridge.h?rev=1333466&r1=1333465&r2=1333466&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/Bridge.h (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/Bridge.h Thu May  3 14:12:54 2012
@@ -98,6 +98,12 @@ public:
     std::string getQueueName() const { return queueName; }
     const qmf::org::apache::qpid::broker::ArgsLinkBridge& getArgs() { return args; }
 
+    /** create a name for a bridge (if none supplied by user config) */
+    static std::string createName(const std::string& linkName,
+                                  const std::string& src,
+                                  const std::string& dest,
+                                  const std::string& key);
+
 private:
     // Callback when the bridge's session is detached.
     void sessionDetached();
@@ -114,7 +120,7 @@ private:
     std::auto_ptr<framing::AMQP_ServerProxy>          peer;
 
     Link* const link;
-    framing::ChannelId          id;
+    const framing::ChannelId          channel;
     qmf::org::apache::qpid::broker::ArgsLinkBridge args;
     qmf::org::apache::qpid::broker::Bridge*        mgmtObject;
     CancellationListener        listener;

Modified: qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1333466&r1=1333465&r2=1333466&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/Broker.cpp Thu May  3 14:12:54 2012
@@ -459,7 +459,7 @@ Manageable::status_t Broker::ManagementM
          * "create()" broker method if these features are needed.
          * TBD: deprecate this interface.
          */
-        QPID_LOG(warning, "The Broker::connect() method will be removed in a future release
of QPID."
+        QPID_LOG(info, "The Broker::connect() method will be removed in a future release
of QPID."
                  " Please use the Broker::create() method with type='link' instead.");
         _qmf::ArgsBrokerConnect& hp=
             dynamic_cast<_qmf::ArgsBrokerConnect&>(args);
@@ -477,9 +477,9 @@ Manageable::status_t Broker::ManagementM
         // - this behavior is backward compatible with previous releases.
         if (!links.getLink(hp.i_host, hp.i_port, transport)) {
             // new link, need to generate a unique name for it
-            framing::Uuid uuid(true);
             std::pair<Link::shared_ptr, bool> response =
-              links.declare(QPID_NAME_PREFIX + uuid.str(), hp.i_host, hp.i_port, transport,
+              links.declare(Link::createName(transport, hp.i_host, hp.i_port),
+                            hp.i_host, hp.i_port, transport,
                             hp.i_durable, hp.i_authMechanism, hp.i_username, hp.i_password);
             if (!response.first) {
                 text = "Unable to create Link";

Modified: qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/Link.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/Link.cpp?rev=1333466&r1=1333465&r2=1333466&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/Link.cpp (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/Link.cpp Thu May  3 14:12:54 2012
@@ -175,7 +175,7 @@ Link::Link(const string&  _name,
     broker->getTimer().add(timerTask);
 
     stringstream exchangeName;
-    exchangeName << "qpid.link." << transport << ":" << host <<
":" << port;
+    exchangeName << "qpid.link." << name;
     std::pair<Exchange::shared_ptr, bool> rc = broker->getExchanges().declare(exchangeName.str(),
                                                                               exchangeTypeName);
     failoverExchange = boost::static_pointer_cast<LinkExchange>(rc.first);
@@ -575,13 +575,8 @@ Link::shared_ptr Link::decode(LinkRegist
     string   password;
     string   name;
 
-    if (kind == ENCODED_IDENTIFIER_V1) {
-        /** previous versions identified the Link by host:port, there was no name
-         * assigned.  So create a unique name for the new Link.
-         */
-        framing::Uuid uuid(true);
-        name = QPID_NAME_PREFIX + uuid.str();
-    } else {
+    if (kind == ENCODED_IDENTIFIER) {
+        // newer version provides a link name.
         buffer.getShortString(name);
     }
     buffer.getShortString(host);
@@ -592,6 +587,13 @@ Link::shared_ptr Link::decode(LinkRegist
     buffer.getShortString(username);
     buffer.getShortString(password);
 
+    if (kind == ENCODED_IDENTIFIER_V1) {
+        /** previous versions identified the Link by host:port, there was no name
+         * assigned.  So create a name for the new Link.
+         */
+        name = createName(transport, host, port);
+    }
+
     return links.declare(name, host, port, transport, durable, authMechanism,
                          username, password).first;
 }
@@ -652,7 +654,7 @@ Manageable::status_t Link::ManagementMet
         /* TBD: deprecate this interface in favor of the Broker::create() method.  The
          * Broker::create() method allows the user to assign a name to the bridge.
          */
-        QPID_LOG(warning, "The Link::bridge() method will be removed in a future release
of QPID."
+        QPID_LOG(info, "The Link::bridge() method will be removed in a future release of
QPID."
                  " Please use the Broker::create() method with type='bridge' instead.");
         _qmf::ArgsLinkBridge& iargs = (_qmf::ArgsLinkBridge&) args;
         QPID_LOG(debug, "Link::bridge() request received; src=" << iargs.i_src <<
@@ -662,11 +664,10 @@ Manageable::status_t Link::ManagementMet
         // existing bridge - this behavior is backward compatible with previous releases.
         Bridge::shared_ptr bridge = links->getBridge(*this, iargs.i_src, iargs.i_dest,
iargs.i_key);
         if (!bridge) {
-            // need to create a new bridge on this link
-            framing::Uuid uuid(true);
-            const std::string name(QPID_NAME_PREFIX + uuid.str());
+            // need to create a new bridge on this link.
             std::pair<Bridge::shared_ptr, bool> rc =
-              links->declare( name, *this, iargs.i_durable,
+              links->declare( Bridge::createName(name, iargs.i_src, iargs.i_dest, iargs.i_key),
+                              *this, iargs.i_durable,
                               iargs.i_src, iargs.i_dest, iargs.i_key, iargs.i_srcIsQueue,
                               iargs.i_srcIsLocal, iargs.i_tag, iargs.i_excludes,
                               iargs.i_dynamic, iargs.i_sync);
@@ -752,6 +753,15 @@ void Link::setState(const framing::Field
     }
 }
 
+std::string Link::createName(const std::string& transport,
+                             const std::string& host,
+                             uint16_t  port)
+{
+    stringstream linkName;
+    linkName << QPID_NAME_PREFIX << transport << std::string(":")
+             << host << std::string(":") << port;
+    return linkName.str();
+}
 
 const std::string Link::exchangeTypeName("qpid.LinkExchange");
 

Modified: qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/Link.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/Link.h?rev=1333466&r1=1333465&r2=1333466&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/Link.h (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/Link.h Thu May  3 14:12:54 2012
@@ -187,6 +187,11 @@ class Link : public PersistableConfig, p
     // replicate internal state of this Link for clustering
     void getState(framing::FieldTable& state) const;
     void setState(const framing::FieldTable& state);
+
+    /** create a name for a link (if none supplied by user config) */
+    static std::string createName(const std::string& transport,
+                                  const std::string& host,
+                                  uint16_t  port);
 };
 }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message