activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1095746 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq: state/ transport/ transport/failover/
Date Thu, 21 Apr 2011 15:15:12 GMT
Author: tabish
Date: Thu Apr 21 15:15:09 2011
New Revision: 1095746

URL: http://svn.apache.org/viewvc?rev=1095746&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQCPP-364

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/Transport.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFilter.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp?rev=1095746&r1=1095745&r2=1095746&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp
Thu Apr 21 15:15:09 2011
@@ -24,6 +24,7 @@
 #include <activemq/commands/RemoveInfo.h>
 #include <activemq/core/ActiveMQConstants.h>
 #include <activemq/transport/TransportListener.h>
+#include <activemq/wireformat/WireFormat.h>
 
 using namespace activemq;
 using namespace activemq::core;
@@ -175,10 +176,6 @@ void ConnectionStateTracker::doRestoreTr
 
         for( ; iter != transactionStates.end(); ++iter ) {
 
-            //if( LOG.isDebugEnabled() ) {
-            //    LOG.debug("tx: " + transactionState.getId());
-            //}
-
             // ignore any empty (ack) transaction
             if( (*iter)->getCommands().size() == 2 ) {
                 Pointer<Command> lastCommand = (*iter)->getCommands().get(1);
@@ -186,9 +183,6 @@ void ConnectionStateTracker::doRestoreTr
                     Pointer<TransactionInfo> transactionInfo = lastCommand.dynamicCast<TransactionInfo>();
 
                     if( transactionInfo->getType() == ActiveMQConstants::TRANSACTION_STATE_COMMITONEPHASE
) {
-                        //if( LOG.isDebugEnabled() ) {
-                        //    LOG.debug("not replaying empty (ack) tx: " + transactionState.getId());
-                        //}
                         toIgnore.push_back(lastCommand);
                         continue;
                     }
@@ -200,9 +194,6 @@ void ConnectionStateTracker::doRestoreTr
             std::vector< Pointer<ProducerState> >::const_iterator state = producerStates.begin();
 
             for( ; state != producerStates.end(); ++state ) {
-                //if( LOG.isDebugEnabled() ) {
-                //    LOG.debug("tx replay producer :" + producerState.getInfo());
-                //}
                 transport->oneway( (*state)->getInfo() );
             }
 
@@ -215,9 +206,6 @@ void ConnectionStateTracker::doRestoreTr
 
             state = producerStates.begin();
             for( ; state != producerStates.end(); ++state ) {
-                //if( LOG.isDebugEnabled() ) {
-                //    LOG.debug("tx remove replayed producer :" + producerState.getInfo());
-                //}
                 transport->oneway( (*state)->getInfo()->createRemoveCommand() );
             }
         }
@@ -282,8 +270,9 @@ void ConnectionStateTracker::doRestoreCo
         for( ; state != consumerStates.end(); ++state ) {
 
             Pointer<ConsumerInfo> infoToSend = (*state)->getInfo();
+            Pointer<wireformat::WireFormat> wireFormat = transport->getWireFormat();
 
-            if( !connectionInterruptionProcessingComplete && infoToSend->getPrefetchSize()
> 0) {
+            if( !connectionInterruptionProcessingComplete && infoToSend->getPrefetchSize()
> 0 && wireFormat->getVersion() > 5) {
 
                 infoToSend.reset( (*state)->getInfo()->cloneDataStructure() );
                 connectionState->getRecoveringPullConsumers().put( infoToSend->getConsumerId(),
(*state)->getInfo() );

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.h?rev=1095746&r1=1095745&r2=1095746&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.h Thu
Apr 21 15:15:09 2011
@@ -164,6 +164,10 @@ namespace transport{
          */
         virtual Pointer<Response> request( const Pointer<Command>& command,
unsigned int timeout );
 
+        virtual Pointer<wireformat::WireFormat> getWireFormat() const {
+        	return this->wireFormat;
+        }
+
         virtual void setWireFormat( const Pointer<wireformat::WireFormat>& wireFormat
){
             this->wireFormat = wireFormat;
         }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/Transport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/Transport.h?rev=1095746&r1=1095745&r2=1095746&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/Transport.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/Transport.h Thu Apr
21 15:15:09 2011
@@ -117,6 +117,15 @@ namespace transport{
         virtual Pointer<Response> request( const Pointer<Command>& command,
unsigned int timeout ) = 0;
 
         /**
+         * Gets the WireFormat instance that is in use by this transport.  In the case of
+         * nested transport this method delegates down to the lowest level transport that
+         * actually maintains a WireFormat info instance.
+         *
+         * @returns The WireFormat the object used to encode / decode commands.
+         */
+        virtual Pointer<wireformat::WireFormat> getWireFormat() const = 0;
+
+        /**
          * Sets the WireFormat instance to use.
          * @param wireFormat
          *      The WireFormat the object used to encode / decode commands.

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFilter.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFilter.h?rev=1095746&r1=1095745&r2=1095746&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFilter.h
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFilter.h
Thu Apr 21 15:15:09 2011
@@ -129,6 +129,10 @@ namespace transport{
             return this->listener;
         }
 
+        virtual Pointer<wireformat::WireFormat> getWireFormat() const {
+        	return next->getWireFormat();
+        }
+
         virtual void setWireFormat( const Pointer<wireformat::WireFormat>& wireFormat
) {
             next->setWireFormat( wireFormat );
         }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp?rev=1095746&r1=1095745&r2=1095746&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
Thu Apr 21 15:15:09 2011
@@ -917,6 +917,19 @@ void FailoverTransport::processResponse(
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+Pointer<wireformat::WireFormat> FailoverTransport::getWireFormat() const {
+
+	Pointer<wireformat::WireFormat> result;
+	Pointer<Transport> transport = this->connectedTransport;
+
+    if( transport != NULL ) {
+        result = transport->getWireFormat();
+    }
+
+    return result;
+}
+
+////////////////////////////////////////////////////////////////////////////////
 long long FailoverTransport::getTimeout() const {
     return this->timeout;
 }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h?rev=1095746&r1=1095745&r2=1095746&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h
Thu Apr 21 15:15:09 2011
@@ -145,6 +145,8 @@ namespace failover {
 
         virtual Pointer<Response> request( const Pointer<Command>& command,
unsigned int timeout );
 
+        virtual Pointer<wireformat::WireFormat> getWireFormat() const;
+
         virtual void setWireFormat( const Pointer<wireformat::WireFormat>& wireFormat
AMQCPP_UNUSED ) {}
 
         virtual void setTransportListener( TransportListener* listener );



Mime
View raw message