activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r511347 - /activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
Date Sat, 24 Feb 2007 21:10:08 GMT
Author: tabish
Date: Sat Feb 24 13:10:07 2007
New Revision: 511347

URL: http://svn.apache.org/viewvc?view=rev&rev=511347
Log:
http://issues.apache.org/activemq/browse/AMQCPP-30

Fix message acking so that we ack all messages correctly

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp?view=diff&rev=511347&r1=511346&r2=511347
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
Sat Feb 24 13:10:07 2007
@@ -813,67 +813,64 @@
 {
     try {
 
-        if( session->getAckMode() == cms::Session::CLIENT_ACKNOWLEDGE ||
-            session->getAckMode() == cms::Session::SESSION_TRANSACTED ) {
+        const commands::Message* amqMessage =
+            dynamic_cast<const commands::Message*>( message );
 
-            const commands::Message* amqMessage =
-                dynamic_cast<const commands::Message*>( message );
+        if( amqMessage == NULL ) {
+            throw OpenWireConnectorException(
+                __FILE__, __LINE__,
+                "OpenWireConnector::acknowledge - "
+                "Message was not a commands::Message derivation.");
+        }
 
-            if( amqMessage == NULL ) {
-                throw OpenWireConnectorException(
-                    __FILE__, __LINE__,
-                    "OpenWireConnector::acknowledge - "
-                    "Message was not a commands::Message derivation.");
-            }
+        const OpenWireConsumerInfo* consumerInfo =
+            dynamic_cast<const OpenWireConsumerInfo*>( consumer );
 
-            const OpenWireConsumerInfo* consumerInfo =
-                dynamic_cast<const OpenWireConsumerInfo*>( consumer );
+        if( consumerInfo == NULL ) {
+            throw OpenWireConnectorException(
+                __FILE__, __LINE__,
+                "OpenWireConnector::acknowledge - "
+                "Consumer was not of the OpenWire flavor.");
+        }
 
-            if( consumerInfo == NULL ) {
+        commands::MessageAck ack;
+        ack.setAckType( (int)ackType );
+        ack.setConsumerId(
+            dynamic_cast<commands::ConsumerId*>(
+                consumerInfo->getConsumerInfo()->
+                    getConsumerId()->cloneDataStructure() ) );
+        ack.setDestination(
+            dynamic_cast<commands::ActiveMQDestination*>(
+                amqMessage->getDestination()->cloneDataStructure() ) );
+        ack.setFirstMessageId(
+            dynamic_cast<commands::MessageId*>(
+                amqMessage->getMessageId()->cloneDataStructure() ) );
+        ack.setLastMessageId(
+            dynamic_cast<commands::MessageId*>(
+                amqMessage->getMessageId()->cloneDataStructure() ) );
+        ack.setMessageCount( 1 );
+
+        if( session->getAckMode() == cms::Session::SESSION_TRANSACTED ) {
+
+            const OpenWireTransactionInfo* transactionInfo =
+                dynamic_cast<const OpenWireTransactionInfo*>(
+                    session->getTransactionInfo() );
+
+            if( transactionInfo == NULL ) {
                 throw OpenWireConnectorException(
                     __FILE__, __LINE__,
                     "OpenWireConnector::acknowledge - "
-                    "Consumer was not of the OpenWire flavor.");
-            }
-
-            commands::MessageAck ack;
-            ack.setAckType( (int)ackType );
-            ack.setConsumerId(
-                dynamic_cast<commands::ConsumerId*>(
-                    consumerInfo->getConsumerInfo()->
-                        getConsumerId()->cloneDataStructure() ) );
-            ack.setDestination(
-                dynamic_cast<commands::ActiveMQDestination*>(
-                    amqMessage->getDestination()->cloneDataStructure() ) );
-            ack.setFirstMessageId(
-                dynamic_cast<commands::MessageId*>(
-                    amqMessage->getMessageId()->cloneDataStructure() ) );
-            ack.setLastMessageId(
-                dynamic_cast<commands::MessageId*>(
-                    amqMessage->getMessageId()->cloneDataStructure() ) );
-            ack.setMessageCount( 1 );
-
-            if( session->getAckMode() == cms::Session::SESSION_TRANSACTED ) {
-
-                const OpenWireTransactionInfo* transactionInfo =
-                    dynamic_cast<const OpenWireTransactionInfo*>(
-                        session->getTransactionInfo() );
-
-                if( transactionInfo == NULL ) {
-                    throw OpenWireConnectorException(
-                        __FILE__, __LINE__,
-                        "OpenWireConnector::acknowledge - "
-                        "Transacted Session, has no Transaction Info.");
-                }
-
-                ack.setTransactionId(
-                    dynamic_cast<commands::TransactionId*>(
-                        transactionInfo->getTransactionInfo()->
-                            getTransactionId()->cloneDataStructure() ) );
+                    "Transacted Session, has no Transaction Info.");
             }
 
-            oneway( &ack );
+            ack.setTransactionId(
+                dynamic_cast<commands::TransactionId*>(
+                    transactionInfo->getTransactionInfo()->
+                        getTransactionId()->cloneDataStructure() ) );
         }
+
+        oneway( &ack );
+
     } catch( ConnectorException& ex ){
         try{ transport->close(); } catch( ... ){}
 



Mime
View raw message