activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r688378 - in /activemq/activemq-cpp/trunk/src/main/activemq/transport/filters: AsyncSendTransport.cpp AsyncSendTransport.h AsyncSendTransportFactory.cpp
Date Sat, 23 Aug 2008 17:28:18 GMT
Author: tabish
Date: Sat Aug 23 10:28:18 2008
New Revision: 688378

URL: http://svn.apache.org/viewvc?rev=688378&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQCPP-189

Adding a max pending message limit property to this AsyncSendTransport so that it can be configured
not to fill all available memory if the broker stalls.

Modified:
    activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/AsyncSendTransport.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/AsyncSendTransport.h
    activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/AsyncSendTransportFactory.cpp

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/AsyncSendTransport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/AsyncSendTransport.cpp?rev=688378&r1=688377&r2=688378&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/AsyncSendTransport.cpp
(original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/AsyncSendTransport.cpp
Sat Aug 23 10:28:18 2008
@@ -33,6 +33,17 @@
 
     this->closed = true;
     this->asyncThread = NULL;
+    this->maxBacklog = 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+AsyncSendTransport::AsyncSendTransport( Transport* next, unsigned int maxBacklog, bool own
)
+ : TransportFilter( next, own ) {
+
+    std::cout << "Async Transport using max Backlog of :" << maxBacklog <<
std::endl;
+    this->closed = true;
+    this->asyncThread = NULL;
+    this->maxBacklog = maxBacklog;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -55,6 +66,12 @@
         // in case the client deletes their copy before we get a chance to
         // send it.
         synchronized( &msgQueue ) {
+
+            while( msgQueue.size() >= this->maxBacklog ) {
+                std::cout << "Max Backlog reached" << std::endl;
+                msgQueue.wait();
+            }
+
             msgQueue.push( command->cloneCommand() );
             msgQueue.notifyAll();
         }
@@ -130,6 +147,10 @@
 
                 // get the data
                 command = msgQueue.pop();
+
+                // Notify the callers that we now have room for at least one more
+                // message to send.
+                msgQueue.notifyAll();
             }
 
             // Dispatch the message

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/AsyncSendTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/AsyncSendTransport.h?rev=688378&r1=688377&r2=688378&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/AsyncSendTransport.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/AsyncSendTransport.h Sat
Aug 23 10:28:18 2008
@@ -32,6 +32,14 @@
     private:
 
         /**
+         * Max pending out-bound messages, this limits the number of
+         * messages that will accumulate if the broker has blocked us or
+         * slowed its reads of our out-bound messages.  Default is zero
+         * or unlimited backlog.
+         */
+        unsigned int maxBacklog;
+
+        /**
          * Thread to send messages in when oneway is called.
          */
         decaf::lang::Thread* asyncThread;
@@ -55,6 +63,14 @@
          */
         AsyncSendTransport( Transport* next, bool own = true );
 
+        /**
+         * Constructor.
+         * @param next - the next Transport in the chain
+         * @param maxBacklog - the max number of pending messages to store.
+         * @param own - true if this filter owns the next and should delete it
+         */
+        AsyncSendTransport( Transport* next, unsigned int maxBacklog, bool own = true );
+
         virtual ~AsyncSendTransport();
 
         /**

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/AsyncSendTransportFactory.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/AsyncSendTransportFactory.cpp?rev=688378&r1=688377&r2=688378&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/AsyncSendTransportFactory.cpp
(original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/AsyncSendTransportFactory.cpp
Sat Aug 23 10:28:18 2008
@@ -18,11 +18,14 @@
 #include "AsyncSendTransportFactory.h"
 
 #include <activemq/transport/filters/AsyncSendTransport.h>
+#include <decaf/lang/Integer.h>
 
 using namespace activemq;
 using namespace activemq::transport;
 using namespace activemq::transport::filters;
 using namespace activemq::exceptions;
+using namespace decaf;
+using namespace decaf::lang;
 
 ////////////////////////////////////////////////////////////////////////////////
 TransportFactory& AsyncSendTransportFactory::getInstance() {
@@ -42,7 +45,11 @@
     bool own ) throw ( ActiveMQException ) {
 
     try{
-        return new AsyncSendTransport( next, own );
+
+        unsigned int maxBacklog = Integer::parseInt(
+            properties.getProperty( "transport.maxAsyncSendBacklog", "0" ) );
+
+        return new AsyncSendTransport( next, maxBacklog, own );
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( decaf::lang::Exception, ActiveMQException )



Mime
View raw message