qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kgiu...@apache.org
Subject svn commit: r1332655 - in /qpid/trunk/qpid/cpp/src/qpid/broker: Link.cpp Link.h
Date Tue, 01 May 2012 13:57:32 GMT
Author: kgiusti
Date: Tue May  1 13:57:31 2012
New Revision: 1332655

URL: http://svn.apache.org/viewvc?rev=1332655&view=rev
Log:
QPID-3963: cleanups from reviewboard input

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Link.h

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp?rev=1332655&r1=1332654&r2=1332655&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp Tue May  1 13:57:31 2012
@@ -50,6 +50,13 @@ using std::stringstream;
 using std::string;
 namespace _qmf = ::qmf::org::apache::qpid::broker;
 
+
+namespace {
+    const std::string FAILOVER_EXCHANGE("amq.failover");
+    const std::string FAILOVER_HEADER_KEY("amq.failover");
+}
+
+
 struct LinkTimerTask : public sys::TimerTask {
     LinkTimerTask(Link& l, sys::Timer& t)
         : TimerTask(int64_t(l.getBroker()->getOptions().linkMaintenanceInterval*
@@ -68,11 +75,6 @@ struct LinkTimerTask : public sys::Timer
 };
 
 
-namespace {
-    const std::string FAILOVER_EXCHANGE("amq.failover");
-    const std::string FAILOVER_HEADER_KEY("amq.failover");
-    const framing::ChannelId FAILOVER_CHANNEL(framing::CHANNEL_HIGH_BIT | 1);    // reserved
for this link
-}
 
 /** LinkExchange is used by the link to subscribe to the remote broker's amq.failover exchange.
  */
@@ -102,7 +104,7 @@ public:
             urlVec = urlArrayToVector(addresses);
             for(size_t i = 0; i < urlVec.size(); ++i)
                 urls.insert(urls.end(), urlVec[i].begin(), urlVec[i].end());
-            QPID_LOG(debug, "Remote broker has provided these failover addresses= " <<
urls);
+            QPID_LOG(notice, "Remote broker has provided these failover addresses= " <<
urls);
             link->setUrl(urls);
         }
     }
@@ -147,7 +149,8 @@ Link::Link(LinkRegistry*  _links,
       channelCounter(1),
       connection(0),
       agent(0),
-      timerTask(new LinkTimerTask(*this, broker->getTimer()))
+      timerTask(new LinkTimerTask(*this, broker->getTimer())),
+      failoverChannel(0)
 {
     if (parent != 0 && broker != 0)
     {
@@ -170,9 +173,9 @@ Link::Link(LinkRegistry*  _links,
     _name << "qpid.link." << transport << ":" << host << ":"
<< port;
     std::pair<Exchange::shared_ptr, bool> rc = broker->getExchanges().declare(_name.str(),
                                                                               exchangeTypeName);
-    exchange = boost::static_pointer_cast<LinkExchange>(rc.first);
-    assert(exchange);
-    exchange->setLink(this);
+    failoverExchange = boost::static_pointer_cast<LinkExchange>(rc.first);
+    assert(failoverExchange);
+    failoverExchange->setLink(this);
 }
 
 Link::~Link ()
@@ -184,7 +187,7 @@ Link::~Link ()
     if (mgmtObject != 0)
         mgmtObject->resourceDestroy ();
 
-    broker->getExchanges().destroy(exchange->getName());
+    broker->getExchanges().destroy(failoverExchange->getName());
 }
 
 void Link::setStateLH (int newState)
@@ -287,11 +290,12 @@ void Link::opened() {
     // attempt to subscribe to failover exchange for updates from remote
     //
 
-    const std::string queueName = "qpid.link." + exchange->getName();
+    const std::string queueName = "qpid.link." + failoverExchange->getName();
+    failoverChannel = nextChannel();
 
-    SessionHandler& sessionHandler = connection->getChannel(FAILOVER_CHANNEL);
+    SessionHandler& sessionHandler = connection->getChannel(failoverChannel);
     sessionHandler.setDetachedCallback( boost::bind(&sessionDetached, this) );
-    sessionHandler.attachAs(exchange->getName());
+    sessionHandler.attachAs(failoverExchange->getName());
 
     framing::AMQP_ServerProxy remoteBroker(sessionHandler.out);
 
@@ -307,15 +311,15 @@ void Link::opened() {
                                     "",     // no key
                                     FieldTable());
     remoteBroker.getMessage().subscribe(queueName,
-                                        exchange->getName(),
+                                        failoverExchange->getName(),
                                         1,           // implied-accept mode
                                         0,           // pre-acquire mode
                                         false,       // exclusive
                                         "",          // resume-id
                                         0,           // resume-ttl
                                         FieldTable());
-    remoteBroker.getMessage().flow(exchange->getName(), 0, 0xFFFFFFFF);
-    remoteBroker.getMessage().flow(exchange->getName(), 1, 0xFFFFFFFF);
+    remoteBroker.getMessage().flow(failoverExchange->getName(), 0, 0xFFFFFFFF);
+    remoteBroker.getMessage().flow(failoverExchange->getName(), 1, 0xFFFFFFFF);
 }
 
 void Link::closed(int, std::string text)
@@ -665,11 +669,11 @@ void Link::closeConnection( const std::s
 {
     if (connection != 0) {
         // cancel our subscription to the failover exchange
-        SessionHandler& sessionHandler = connection->getChannel(FAILOVER_CHANNEL);
+        SessionHandler& sessionHandler = connection->getChannel(failoverChannel);
         if (sessionHandler.getSession()) {
             framing::AMQP_ServerProxy remoteBroker(sessionHandler.out);
-            remoteBroker.getMessage().cancel(exchange->getName());
-            remoteBroker.getSession().detach(exchange->getName());
+            remoteBroker.getMessage().cancel(failoverExchange->getName());
+            remoteBroker.getSession().detach(failoverExchange->getName());
         }
         connection->close(CLOSE_CODE_CONNECTION_FORCED, reason);
         connection = 0;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Link.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Link.h?rev=1332655&r1=1332654&r2=1332655&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Link.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Link.h Tue May  1 13:57:31 2012
@@ -86,7 +86,8 @@ class Link : public PersistableConfig, p
     Connection* connection;
     management::ManagementAgent* agent;
     boost::intrusive_ptr<sys::TimerTask> timerTask;
-    boost::shared_ptr<broker::LinkExchange> exchange;
+    boost::shared_ptr<broker::LinkExchange> failoverExchange;  // subscribed to remote's
amq.failover exchange
+    uint failoverChannel;
 
     static const int STATE_WAITING     = 1;
     static const int STATE_CONNECTING  = 2;



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message