activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r807386 - in /activemq/activemq-cpp/trunk/activemq-cpp/src: main/activemq/wireformat/ main/activemq/wireformat/openwire/ main/activemq/wireformat/stomp/ test/activemq/transport/
Date Mon, 24 Aug 2009 21:19:06 GMT
Author: tabish
Date: Mon Aug 24 21:19:05 2009
New Revision: 807386

URL: http://svn.apache.org/viewvc?rev=807386&view=rev
Log:
Add the ability to query the WireFormat object to see if its in an unmarshal call so that
when large messages are processed the inactivity monitor can determine the WireFormat is busy
or the channel is really inactive.

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/WireFormat.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireFormat.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireFormat.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/stomp/StompWireFormat.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/stomp/StompWireFormat.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/IOTransportTest.cpp

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/WireFormat.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/WireFormat.h?rev=807386&r1=807385&r2=807386&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/WireFormat.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/WireFormat.h Mon
Aug 24 21:19:05 2009
@@ -101,6 +101,17 @@
         virtual bool hasNegotiator() const = 0;
 
         /**
+         * Indicates if the WireFromat object is in the process of receiving a message. 
This
+         * is useful for monitoring inactivity and the WireFormat is processing a large message
+         * which takes longer than some configured timeout to unmarshal, the inactivity monitor
+         * can query the WireFormat instance to determine if its busy or not and not mark
the
+         * connection as inactive if so.
+         *
+         * @returns true if the WireFormat object is unmarshaling a message.
+         */
+        virtual bool inReceive() const = 0;
+
+        /**
          * If the Transport Provides a Negotiator this method will create and return
          * a new instance of the Negotiator.
          *

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireFormat.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireFormat.cpp?rev=807386&r1=807385&r2=807386&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireFormat.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireFormat.cpp
Mon Aug 24 21:19:05 2009
@@ -292,6 +292,22 @@
 
     try {
 
+        class Finally {
+        private:
+
+            decaf::util::concurrent::atomic::AtomicBoolean* state;
+
+        public:
+
+            Finally( decaf::util::concurrent::atomic::AtomicBoolean* state ) : state( state
) {
+                state->set( true );
+            }
+
+            ~Finally() {
+                state->set( false );
+            }
+        } finalizer( &( this->receiving ) );
+
         unsigned char dataType = dis->readByte();
 
         if( dataType != NULL_TYPE ) {

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireFormat.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireFormat.h?rev=807386&r1=807385&r2=807386&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireFormat.h
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireFormat.h
Mon Aug 24 21:19:05 2009
@@ -25,6 +25,7 @@
 #include <activemq/wireformat/openwire/utils/BooleanStream.h>
 #include <decaf/lang/Pointer.h>
 #include <decaf/util/Properties.h>
+#include <decaf/util/concurrent/atomic/AtomicBoolean.h>
 #include <decaf/lang/exceptions/IllegalStateException.h>
 #include <decaf/lang/exceptions/IllegalArgumentException.h>
 #include <memory>
@@ -62,6 +63,9 @@
         // Uniquely Generated ID, initialize in the Ctor
         std::string id;
 
+        // Indicates when we are in the doUnmarshal call
+        decaf::util::concurrent::atomic::AtomicBoolean receiving;
+
         // WireFormat Data
         int version;
         bool stackTraceEnabled;
@@ -273,6 +277,15 @@
         void setVersion( int version ) throw ( decaf::lang::exceptions::IllegalArgumentException
);
 
         /**
+         * Is there a Message being unmarshaled?
+         *
+         * @return true while in the doUnmarshal method.
+         */
+        virtual bool inReceive() const {
+            return this->receiving.get();
+        }
+
+        /**
          * Checks if the cacheEnabled flag is on
          * @return true if the flag is on.
          */

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/stomp/StompWireFormat.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/stomp/StompWireFormat.cpp?rev=807386&r1=807385&r2=807386&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/stomp/StompWireFormat.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/stomp/StompWireFormat.cpp
Mon Aug 24 21:19:05 2009
@@ -156,6 +156,22 @@
         // Return the Command.
         const std::string commandId = frame->getCommand();
 
+        class Finally {
+        private:
+
+            decaf::util::concurrent::atomic::AtomicBoolean* state;
+
+        public:
+
+            Finally( decaf::util::concurrent::atomic::AtomicBoolean* state ) : state( state
) {
+                state->set( true );
+            }
+
+            ~Finally() {
+                state->set( false );
+            }
+        } finalizer( &( this->receiving ) );
+
         if( commandId == StompCommandConstants::CONNECTED ){
             return this->unmarshalConnected( frame );
         } else if( commandId == StompCommandConstants::ERROR_CMD ){

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/stomp/StompWireFormat.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/stomp/StompWireFormat.h?rev=807386&r1=807385&r2=807386&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/stomp/StompWireFormat.h
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/stomp/StompWireFormat.h
Mon Aug 24 21:19:05 2009
@@ -22,6 +22,7 @@
 #include <activemq/wireformat/WireFormat.h>
 #include <activemq/wireformat/stomp/StompFrame.h>
 #include <activemq/wireformat/stomp/StompHelper.h>
+#include <decaf/util/concurrent/atomic/AtomicBoolean.h>
 #include <decaf/io/IOException.h>
 #include <decaf/lang/Pointer.h>
 
@@ -44,6 +45,9 @@
         // and unsubscribe are set to use the client Id.
         std::string clientId;
 
+        // Indicates when we are in the doUnmarshal call
+        decaf::util::concurrent::atomic::AtomicBoolean receiving;
+
     public:
 
         StompWireFormat();
@@ -96,6 +100,15 @@
         }
 
         /**
+         * Is there a Message being unmarshaled?
+         *
+         * @return true while in the doUnmarshal method.
+         */
+        virtual bool inReceive() const {
+            return this->receiving.get();
+        }
+
+        /**
          * Returns true if this WireFormat has a Negotiator that needs to wrap the
          * Transport that uses it.
          * @returns true if the WireFormat provides a Negotiator.

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/IOTransportTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/IOTransportTest.cpp?rev=807386&r1=807385&r2=807386&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/IOTransportTest.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/IOTransportTest.cpp
Mon Aug 24 21:19:05 2009
@@ -73,6 +73,8 @@
 
     virtual int getVersion() const { return 0; }
 
+    virtual bool inReceive() const { return false; }
+
     virtual bool hasNegotiator() const { return false; }
 
     virtual Pointer<Transport> createNegotiator(



Mime
View raw message