qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From astitc...@apache.org
Subject svn commit: r1489458 - in /qpid/trunk/qpid/cpp/src/qpid: amqp_0_10/ broker/ broker/amqp/ messaging/amqp/ sys/
Date Tue, 04 Jun 2013 14:27:55 GMT
Author: astitcher
Date: Tue Jun  4 14:27:55 2013
New Revision: 1489458

URL: http://svn.apache.org/r1489458
Log:
QPID-4854: Make the protocol negotiation timeout actually relate to
the protocol negotiation!

Modified:
    qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SslTransport.h
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h
    qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h
    qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h
    qpid/trunk/qpid/cpp/src/qpid/sys/OutputControl.h
    qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp?rev=1489458&r1=1489457&r2=1489458&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp Tue Jun  4 14:27:55 2013
@@ -118,6 +118,7 @@ size_t  Connection::encode(char* buffer,
 }
 
 void Connection::abort() { output.abort(); }
+void Connection::connectionEstablished() { output.connectionEstablished(); }
 void Connection::activateOutput() { output.activateOutput(); }
 
 void  Connection::close() {

Modified: qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h?rev=1489458&r1=1489457&r2=1489458&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h Tue Jun  4 14:27:55 2013
@@ -65,6 +65,7 @@ class Connection  : public sys::Connecti
     bool isClosed() const;
     bool canEncode();
     void abort();
+    void connectionEstablished();
     void activateOutput();
     void closed();              // connection closed by peer.
     void close();               // closing from this end.

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=1489458&r1=1489457&r2=1489458&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Tue Jun  4 14:27:55 2013
@@ -456,6 +456,7 @@ void Connection::setHeartbeatInterval(ui
             timer.add(timeoutTimer);
         }
     }
+    out.connectionEstablished();
 }
 
 void Connection::startLinkHeartbeatTimeoutTask() {
@@ -463,6 +464,7 @@ void Connection::startLinkHeartbeatTimeo
         linkHeartbeatTimer = new LinkHeartbeatTask(timer, 2 * heartbeat * TIME_SEC, *this);
         timer.add(linkHeartbeatTimer);
     }
+    out.connectionEstablished();
 }
 
 void Connection::restartTimeout()
@@ -480,6 +482,7 @@ bool Connection::isOpen() { return adapt
 Connection::OutboundFrameTracker::OutboundFrameTracker(Connection& _con) : con(_con),
next(0) {}
 void Connection::OutboundFrameTracker::close() { next->close(); }
 void Connection::OutboundFrameTracker::abort() { next->abort(); }
+void Connection::OutboundFrameTracker::connectionEstablished() { next->connectionEstablished();
}
 void Connection::OutboundFrameTracker::activateOutput() { next->activateOutput(); }
 void Connection::OutboundFrameTracker::send(framing::AMQFrame& f)
 {

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=1489458&r1=1489457&r2=1489458&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Tue Jun  4 14:27:55 2013
@@ -189,6 +189,7 @@ class Connection : public sys::Connectio
         OutboundFrameTracker(Connection&);
         void close();
         void abort();
+        void connectionEstablished();
         void activateOutput();
         void send(framing::AMQFrame&);
         void wrap(sys::ConnectionOutputHandlerPtr&);

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.cpp?rev=1489458&r1=1489457&r2=1489458&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.cpp Tue Jun  4 14:27:55 2013
@@ -181,6 +181,7 @@ void Connection::process()
         pn_connection_set_container(connection, broker.getFederationTag().c_str());
         setContainerId(pn_connection_remote_container(connection));
         pn_connection_open(connection);
+        out.connectionEstablished();
     }
 
     for (pn_session_t* s = pn_session_head(connection, REQUIRES_OPEN); s; s = pn_session_next(s,
REQUIRES_OPEN)) {

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SslTransport.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SslTransport.h?rev=1489458&r1=1489457&r2=1489458&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SslTransport.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SslTransport.h Tue Jun  4 14:27:55 2013
@@ -48,6 +48,7 @@ class SslTransport : public Transport
 
     void activateOutput();
     void abort();
+    void connectionEstablished() {};
     void close();
 
   private:

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h?rev=1489458&r1=1489457&r2=1489458&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h Tue Jun  4 14:27:55 2013
@@ -48,6 +48,7 @@ class TcpTransport : public Transport
 
     void activateOutput();
     void abort();
+    void connectionEstablished() {};
     void close();
 
   private:

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp?rev=1489458&r1=1489457&r2=1489458&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp Tue Jun  4 14:27:55 2013
@@ -100,6 +100,13 @@ void AsynchIOHandler::abort() {
     aio->queueWriteClose();
 }
 
+void AsynchIOHandler::connectionEstablished() {
+    if (timeoutTimerTask) {
+        timeoutTimerTask->cancel();
+        timeoutTimerTask.reset();
+    }
+}
+
 void AsynchIOHandler::activateOutput() {
     aio->notifyPendingWrite();
 }
@@ -123,13 +130,6 @@ void AsynchIOHandler::readbuff(AsynchIO&
     if (codec) {                // Already initiated
         try {
             decoded = codec->decode(buff->bytes+buff->dataStart, buff->dataCount);
-            // When we've decoded 3 reads (probably frames) we will have authenticated and
-            // started heartbeats, if specified, in many (but not all) cases so now we will
cancel
-            // the idle connection timeout - this is really hacky, and would be better implemented
-            // in the connection, but that isn't actually created until the first decode.
-            if (reads == 3) {
-                timeoutTimerTask->cancel();
-            }
         }catch(const std::exception& e){
             QPID_LOG(error, e.what());
             readError = true;
@@ -203,10 +203,6 @@ void AsynchIOHandler::idle(AsynchIO&){
     if (isClient && codec == 0) {
         codec = factory->create(*this, identifier, getSecuritySettings(aio, nodict));
         write(framing::ProtocolInitiation(codec->getVersion()));
-        // We've just sent the protocol negotiation so we can cancel the timeout for that
-        // This is not ideal, because we've not received anything yet, but heartbeats will
-        // be active soon
-        timeoutTimerTask->cancel();
         return;
     }
     if (codec == 0) return;

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h?rev=1489458&r1=1489457&r2=1489458&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h Tue Jun  4 14:27:55 2013
@@ -63,6 +63,7 @@ class AsynchIOHandler : public OutputCon
 
     // Output side
     QPID_COMMON_EXTERN void abort();
+    QPID_COMMON_EXTERN void connectionEstablished();
     QPID_COMMON_EXTERN void activateOutput();
 
     // Input side

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h?rev=1489458&r1=1489457&r2=1489458&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h Tue Jun  4 14:27:55 2013
@@ -43,6 +43,7 @@ class ConnectionOutputHandlerPtr : publi
 
     void close() { next->close(); }
     void abort() { next->abort(); }
+    void connectionEstablished() { next->connectionEstablished(); }
     void activateOutput() { next->activateOutput(); }
     void send(framing::AMQFrame& f) { next->send(f); }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/OutputControl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/OutputControl.h?rev=1489458&r1=1489457&r2=1489458&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/OutputControl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/OutputControl.h Tue Jun  4 14:27:55 2013
@@ -32,6 +32,7 @@ namespace sys {
     public:
         virtual ~OutputControl() {}
         virtual void abort() = 0;
+        virtual void connectionEstablished() = 0;
         virtual void activateOutput() = 0;
     };
 

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp?rev=1489458&r1=1489457&r2=1489458&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp Tue Jun  4 14:27:55 2013
@@ -67,6 +67,7 @@ class RdmaIOHandler : public OutputContr
     // Output side
     void close();
     void abort();
+    void connectionEstablished();
     void activateOutput();
     void initProtocolOut();
 
@@ -131,6 +132,10 @@ void RdmaIOHandler::close() {
 void RdmaIOHandler::abort() {
 }
 
+// TODO: Dummy implementation, need to fill this in for connection establishment timeout
to work
+void RdmaIOHandler::connectionEstablished() {
+}
+
 void RdmaIOHandler::activateOutput() {
     aio->notifyPendingWrite();
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message