qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From astitc...@apache.org
Subject svn commit: r1440616 - in /qpid/trunk/qpid/cpp/src/qpid: amqp_0_10/ broker/ messaging/amqp/ sys/
Date Wed, 30 Jan 2013 19:47:55 GMT
Author: astitcher
Date: Wed Jan 30 19:47:54 2013
New Revision: 1440616

URL: http://svn.apache.org/viewvc?rev=1440616&view=rev
Log:
QPID-4514: Remove IO readCredit throttling only used by removed cluster code

Modified:
    qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SslTransport.h
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h
    qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.h
    qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h
    qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h
    qpid/trunk/qpid/cpp/src/qpid/sys/OutputControl.h
    qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp?rev=1440616&r1=1440615&r2=1440616&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp Wed Jan 30 19:47:54 2013
@@ -119,7 +119,6 @@ size_t  Connection::encode(char* buffer,
 
 void Connection::abort() { output.abort(); }
 void Connection::activateOutput() { output.activateOutput(); }
-void Connection::giveReadCredit(int32_t credit) { output.giveReadCredit(credit); }
 
 void  Connection::close() {
     // No more frames can be pushed onto the queue.

Modified: qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h?rev=1440616&r1=1440615&r2=1440616&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h Wed Jan 30 19:47:54 2013
@@ -66,7 +66,6 @@ class Connection  : public sys::Connecti
     bool canEncode();
     void abort();
     void activateOutput();
-    void giveReadCredit(int32_t);
     void closed();              // connection closed by peer.
     void close();               // closing from this end.
     void send(framing::AMQFrame&);

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=1440616&r1=1440615&r2=1440616&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Wed Jan 30 19:47:54 2013
@@ -481,7 +481,6 @@ void Connection::OutboundFrameTracker::c
 size_t Connection::OutboundFrameTracker::getBuffered() const { return next->getBuffered();
}
 void Connection::OutboundFrameTracker::abort() { next->abort(); }
 void Connection::OutboundFrameTracker::activateOutput() { next->activateOutput(); }
-void Connection::OutboundFrameTracker::giveReadCredit(int32_t credit) { next->giveReadCredit(credit);
}
 void Connection::OutboundFrameTracker::send(framing::AMQFrame& f)
 {
     next->send(f);

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=1440616&r1=1440615&r2=1440616&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Wed Jan 30 19:47:54 2013
@@ -199,7 +199,6 @@ class Connection : public sys::Connectio
         size_t getBuffered() const;
         void abort();
         void activateOutput();
-        void giveReadCredit(int32_t credit);
         void send(framing::AMQFrame&);
         void wrap(sys::ConnectionOutputHandlerPtr&);
       private:

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=1440616&r1=1440615&r2=1440616&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Wed Jan 30 19:47:54 2013
@@ -143,11 +143,6 @@ void SessionState::activateOutput() {
         getConnection().outputTasks.activateOutput();
 }
 
-void SessionState::giveReadCredit(int32_t credit) {
-    if (isAttached())
-        getConnection().outputTasks.giveReadCredit(credit);
-}
-
 ManagementObject::shared_ptr SessionState::GetManagementObject(void) const
 {
     return mgmtObject;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=1440616&r1=1440615&r2=1440616&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Wed Jan 30 19:47:54 2013
@@ -99,7 +99,6 @@ class SessionState : public qpid::Sessio
     /** OutputControl **/
     void abort();
     void activateOutput();
-    void giveReadCredit(int32_t);
 
     void senderCompleted(const framing::SequenceSet& ranges);
 

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SslTransport.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SslTransport.h?rev=1440616&r1=1440615&r2=1440616&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SslTransport.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SslTransport.h Wed Jan 30 19:47:54 2013
@@ -49,7 +49,7 @@ class SslTransport : public Transport
     void activateOutput();
     void abort();
     void close();
-    void giveReadCredit(int32_t) {}
+
   private:
     qpid::sys::ssl::SslSocket socket;
     TransportContext& context;

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h?rev=1440616&r1=1440615&r2=1440616&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h Wed Jan 30 19:47:54 2013
@@ -49,7 +49,6 @@ class TcpTransport : public Transport
     void activateOutput();
     void abort();
     void close();
-    void giveReadCredit(int32_t) {}
 
   private:
     boost::scoped_ptr<qpid::sys::Socket> socket;

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.cpp?rev=1440616&r1=1440615&r2=1440616&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.cpp Wed Jan 30 19:47:54 2013
@@ -32,8 +32,6 @@ void AggregateOutput::abort() { control.
 
 void AggregateOutput::activateOutput() { control.activateOutput(); }
 
-void AggregateOutput::giveReadCredit(int32_t credit) { control.giveReadCredit(credit); }
-
 namespace {
 // Clear the busy flag and notify waiting threads in destructor.
 struct ScopedBusy {

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.h?rev=1440616&r1=1440615&r2=1440616&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.h Wed Jan 30 19:47:54 2013
@@ -59,7 +59,6 @@ class QPID_COMMON_CLASS_EXTERN Aggregate
     // These may be called concurrently with any function.
     QPID_COMMON_EXTERN void abort();
     QPID_COMMON_EXTERN void activateOutput();
-    QPID_COMMON_EXTERN void giveReadCredit(int32_t);
     QPID_COMMON_EXTERN void addOutputTask(OutputTask* t);
 
     // These functions must not be called concurrently with each other.

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp?rev=1440616&r1=1440615&r2=1440616&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp Wed Jan 30 19:47:54 2013
@@ -59,8 +59,7 @@ AsynchIOHandler::AsynchIOHandler(const s
     reads(0),
     readError(false),
     isClient(isClient0),
-    nodict(nodict0),
-    readCredit(InfiniteCredit)
+    nodict(nodict0)
 {}
 
 AsynchIOHandler::~AsynchIOHandler() {
@@ -104,21 +103,6 @@ void AsynchIOHandler::activateOutput() {
     aio->notifyPendingWrite();
 }
 
-// Input side
-void AsynchIOHandler::giveReadCredit(int32_t credit) {
-    // Check whether we started in the don't about credit state
-    if (readCredit.boolCompareAndSwap(InfiniteCredit, credit))
-        return;
-    // TODO In theory should be able to use an atomic operation before taking the lock
-    // but in practice there seems to be an unexplained race in that case
-    ScopedLock<Mutex> l(creditLock);
-    if (readCredit.fetchAndAdd(credit) != 0)
-        return;
-    assert(readCredit.get() >= 0);
-    if (readCredit.get() != 0)
-        aio->startReading();
-}
-
 namespace {
     SecuritySettings getSecuritySettings(AsynchIO* aio, bool nodict)
     {
@@ -133,26 +117,6 @@ void AsynchIOHandler::readbuff(AsynchIO&
         return;
     }
 
-    // Check here for read credit
-    if (readCredit.get() != InfiniteCredit) {
-        if (readCredit.get() == 0) {
-            // FIXME aconway 2009-10-01:  Workaround to avoid "false wakeups".
-            // readbuff is sometimes called with no credit.
-            // This should be fixed somewhere else to avoid such calls.
-            aio->unread(buff);
-            return;
-        }
-        // TODO In theory should be able to use an atomic operation before taking the lock
-        // but in practice there seems to be an unexplained race in that case
-        ScopedLock<Mutex> l(creditLock);
-        if (--readCredit == 0) {
-            assert(readCredit.get() >= 0);
-            if (readCredit.get() == 0) {
-                aio->stopReading();
-            }
-        }
-    }
-
     ++reads;
     size_t decoded = 0;
     if (codec) {                // Already initiated

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h?rev=1440616&r1=1440615&r2=1440616&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h Wed Jan 30 19:47:54 2013
@@ -52,9 +52,6 @@ class AsynchIOHandler : public OutputCon
     bool readError;
     bool isClient;
     bool nodict;
-    AtomicValue<int32_t> readCredit;
-    static const int32_t InfiniteCredit = -1;
-    Mutex creditLock;
     boost::intrusive_ptr<sys::TimerTask> timeoutTimerTask;
 
     void write(const framing::ProtocolInitiation&);
@@ -67,7 +64,6 @@ class AsynchIOHandler : public OutputCon
     // Output side
     QPID_COMMON_EXTERN void abort();
     QPID_COMMON_EXTERN void activateOutput();
-    QPID_COMMON_EXTERN void giveReadCredit(int32_t credit);
 
     // Input side
     QPID_COMMON_EXTERN void readbuff(AsynchIO& aio, AsynchIOBufferBase* buff);

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h?rev=1440616&r1=1440615&r2=1440616&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h Wed Jan 30 19:47:54 2013
@@ -45,7 +45,6 @@ class ConnectionOutputHandlerPtr : publi
     size_t getBuffered() const { return next->getBuffered(); }
     void abort() { next->abort(); }
     void activateOutput() { next->activateOutput(); }
-    void giveReadCredit(int32_t credit) { next->giveReadCredit(credit); }
     void send(framing::AMQFrame& f) { next->send(f); }
 
   private:

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/OutputControl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/OutputControl.h?rev=1440616&r1=1440615&r2=1440616&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/OutputControl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/OutputControl.h Wed Jan 30 19:47:54 2013
@@ -33,7 +33,6 @@ namespace sys {
         virtual ~OutputControl() {}
         virtual void abort() = 0;
         virtual void activateOutput() = 0;
-        virtual void giveReadCredit(int32_t credit) = 0;
     };
 
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp?rev=1440616&r1=1440615&r2=1440616&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp Wed Jan 30 19:47:54 2013
@@ -68,7 +68,6 @@ class RdmaIOHandler : public OutputContr
     void close();
     void abort();
     void activateOutput();
-    void giveReadCredit(int32_t credit);
     void initProtocolOut();
 
     // Input side
@@ -200,10 +199,6 @@ void RdmaIOHandler::full(Rdma::AsynchIO&
     QPID_LOG(debug, "Rdma: buffer full [" << identifier << "]");
 }
 
-// TODO: Dummy implementation of read throttling
-void RdmaIOHandler::giveReadCredit(int32_t) {
-}
-
 // The logic here is subtly different from TCP as RDMA is message oriented
 // so we define that an RDMA message is a frame - in this case there is no putting back
 // of any message remainder - there shouldn't be any. And what we read here can't be



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message