activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r508802 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire: OpenWireConnector.cpp OpenWireConnector.h OpenWireTransactionInfo.h
Date Sat, 17 Feb 2007 20:04:37 GMT
Author: tabish
Date: Sat Feb 17 12:04:36 2007
New Revision: 508802

URL: http://svn.apache.org/viewvc?view=rev&rev=508802
Log:
http://issues.apache.org/activemq/browse/AMQCPP-30

Add Transaction Processing

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireTransactionInfo.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp?view=diff&rev=508802&r1=508801&r2=508802
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
Sat Feb 17 12:04:36 2007
@@ -32,27 +32,28 @@
 #include <activemq/connector/openwire/BrokerException.h>
 #include <activemq/connector/openwire/OpenWireFormatFactory.h>
 
+#include <activemq/connector/openwire/commands/ActiveMQMessage.h>
+#include <activemq/connector/openwire/commands/ActiveMQBytesMessage.h>
+#include <activemq/connector/openwire/commands/ActiveMQTextMessage.h>
+#include <activemq/connector/openwire/commands/ActiveMQMapMessage.h>
+#include <activemq/connector/openwire/commands/ActiveMQTopic.h>
+#include <activemq/connector/openwire/commands/ActiveMQQueue.h>
 #include <activemq/connector/openwire/commands/ActiveMQTempTopic.h>
 #include <activemq/connector/openwire/commands/ActiveMQTempQueue.h>
+#include <activemq/connector/openwire/commands/BrokerInfo.h>
+#include <activemq/connector/openwire/commands/BrokerError.h>
 #include <activemq/connector/openwire/commands/ConnectionId.h>
 #include <activemq/connector/openwire/commands/DestinationInfo.h>
-#include <activemq/connector/openwire/commands/RemoveInfo.h>
-#include <activemq/connector/openwire/commands/ShutdownInfo.h>
-#include <activemq/connector/openwire/commands/SessionInfo.h>
+#include <activemq/connector/openwire/commands/ExceptionResponse.h>
 #include <activemq/connector/openwire/commands/Message.h>
 #include <activemq/connector/openwire/commands/MessageAck.h>
 #include <activemq/connector/openwire/commands/MessageDispatch.h>
+#include <activemq/connector/openwire/commands/RemoveInfo.h>
+#include <activemq/connector/openwire/commands/ShutdownInfo.h>
+#include <activemq/connector/openwire/commands/SessionInfo.h>
+#include <activemq/connector/openwire/commands/TransactionInfo.h>
+#include <activemq/connector/openwire/commands/LocalTransactionId.h>
 #include <activemq/connector/openwire/commands/WireFormatInfo.h>
-#include <activemq/connector/openwire/commands/BrokerInfo.h>
-#include <activemq/connector/openwire/commands/BrokerError.h>
-#include <activemq/connector/openwire/commands/ActiveMQTopic.h>
-#include <activemq/connector/openwire/commands/ActiveMQQueue.h>
-#include <activemq/connector/openwire/commands/ExceptionResponse.h>
-#include <activemq/connector/openwire/commands/BrokerError.h>
-#include <activemq/connector/openwire/commands/ActiveMQMessage.h>
-#include <activemq/connector/openwire/commands/ActiveMQBytesMessage.h>
-#include <activemq/connector/openwire/commands/ActiveMQTextMessage.h>
-#include <activemq/connector/openwire/commands/ActiveMQMapMessage.h>
 
 using namespace std;
 using namespace activemq;
@@ -359,38 +360,38 @@
 {
     OpenWireConsumerInfo* consumer = NULL;
     commands::ConsumerInfo* consumerInfo = NULL;
-    
+
     try
     {
         enforceConnected();
 
         consumer = new OpenWireConsumerInfo();
-        consumerInfo = createConsumerInfo( destination, session ); 
+        consumerInfo = createConsumerInfo( destination, session );
         consumer->setConsumerInfo( consumerInfo );
 
         consumerInfo->setSelector( selector );
-        consumerInfo->setNoLocal( noLocal );       
-        
+        consumerInfo->setNoLocal( noLocal );
+
         // Send the message to the broker.
         Response* response = syncRequest(consumerInfo);
-        
+
         // The broker did not return an error - this is good.
         // Just discard the response.
         delete response;
 
         return consumer;
-        
+
     } catch( ConnectorException& ex ) {
         delete consumer;
         delete consumerInfo;
-        
+
         ex.setMark( __FILE__, __LINE__ );
         throw ex;
     } catch( ... ) {
         delete consumer;
         delete consumerInfo;
-        
-        throw OpenWireConnectorException( __FILE__, __LINE__, 
+
+        throw OpenWireConnectorException( __FILE__, __LINE__,
             "caught unknown exception" );
     }
 }
@@ -406,39 +407,39 @@
 {
     OpenWireConsumerInfo* consumer = NULL;
     commands::ConsumerInfo* consumerInfo = NULL;
-    
+
     try
     {
         enforceConnected();
 
         consumer = new OpenWireConsumerInfo();
-        consumerInfo = createConsumerInfo( topic, session ); 
+        consumerInfo = createConsumerInfo( topic, session );
         consumer->setConsumerInfo( consumerInfo );
 
         consumerInfo->setSelector( selector );
-        consumerInfo->setNoLocal( noLocal );        
-        consumerInfo->setSubscriptionName( name );        
-        
+        consumerInfo->setNoLocal( noLocal );
+        consumerInfo->setSubscriptionName( name );
+
         // Send the message to the broker.
         Response* response = syncRequest(consumerInfo);
-        
+
         // The broker did not return an error - this is good.
         // Just discard the response.
         delete response;
 
         return consumer;
-        
+
     } catch( ConnectorException& ex ) {
         delete consumer;
         delete consumerInfo;
-        
+
         ex.setMark( __FILE__, __LINE__ );
         throw ex;
     } catch( ... ) {
         delete consumer;
         delete consumerInfo;
-        
-        throw OpenWireConnectorException( __FILE__, __LINE__, 
+
+        throw OpenWireConnectorException( __FILE__, __LINE__,
             "caught unknown exception" );
     }
 }
@@ -448,9 +449,9 @@
     const cms::Destination* destination,
     connector::SessionInfo* session )
         throw ( ConnectorException )
-{   
+{
     commands::ConsumerInfo* consumerInfo = NULL;
-    
+
     try
     {
         consumerInfo = new commands::ConsumerInfo();
@@ -832,22 +833,29 @@
 
         enforceConnected();
 
-//        TransactionInfo* transaction = new StompTransactionInfo();
-//
-//        transaction->setTransactionId( getNextTransactionId() );
-//
-//        session->setTransactionInfo( transaction );
-//
-//        BeginCommand cmd;
-//
-//        cmd.setTransactionId(
-//                Integer::toString( transaction->getTransactionId() ) );
-//
-//        transport->oneway( &cmd );
-//
-//        return transaction;
+        OpenWireTransactionInfo* transaction =
+            new OpenWireTransactionInfo();
 
-        return NULL;
+        // Place Transaction Data in session for later use as well as
+        // the session in the Transaction Data
+        session->setTransactionInfo( transaction );
+        transaction->setSessionInfo( session );
+
+        // Prepare and send the Transaction command
+        commands::TransactionInfo* info = new commands::TransactionInfo();
+
+        info->setConnectionId(
+            dynamic_cast<commands::ConnectionId*>(
+                connectionInfo.getConnectionId()->cloneDataStructure() ) );
+        info->setTransactionId( createLocalTransactionId() );
+        info->setType( (int)TRANSACTION_BEGIN );
+
+        transport->oneway( info );
+
+        // Store for later
+        transaction->setTransactionInfo( info );
+
+        return transaction;
     }
     catch( CommandIOException& ex ){
         transport->close();
@@ -861,19 +869,29 @@
 
 ////////////////////////////////////////////////////////////////////////////////
 void OpenWireConnector::commit( TransactionInfo* transaction,
-                                connector::SessionInfo* session )
+                                SessionInfo* session AMQCPP_UNUSED )
     throw ( ConnectorException )
 {
     try
     {
         enforceConnected();
 
-//        CommitCommand cmd;
-//
-//        cmd.setTransactionId(
-//                Integer::toString( transaction->getTransactionId() ) );
-//
-//        transport->oneway( &cmd );
+        OpenWireTransactionInfo* transactionInfo =
+            dynamic_cast<OpenWireTransactionInfo*>( transaction );
+
+        if( transactionInfo == NULL ) {
+            throw OpenWireConnectorException(
+                __FILE__, __LINE__,
+                "OpenWireConnector::commit - "
+                "Transaction was not of the OpenWire flavor.");
+        }
+
+        commands::TransactionInfo* info =
+            transactionInfo->getTransactionInfo();
+
+        info->setType( (int)TRANSACTION_COMMITONEPHASE );
+
+        transport->oneway( info );
     }
     catch( CommandIOException& ex ){
         transport->close();
@@ -886,19 +904,29 @@
 
 ////////////////////////////////////////////////////////////////////////////////
 void OpenWireConnector::rollback( TransactionInfo* transaction,
-                                  connector::SessionInfo* session )
+                                  SessionInfo* session AMQCPP_UNUSED )
     throw ( ConnectorException )
 {
     try
     {
         enforceConnected();
 
-//        AbortCommand cmd;
-//
-//        cmd.setTransactionId(
-//                Integer::toString( transaction->getTransactionId() ) );
-//
-//        transport->oneway( &cmd );
+        OpenWireTransactionInfo* transactionInfo =
+            dynamic_cast<OpenWireTransactionInfo*>( transaction );
+
+        if( transactionInfo == NULL ) {
+            throw OpenWireConnectorException(
+                __FILE__, __LINE__,
+                "OpenWireConnector::commit - "
+                "Transaction was not of the OpenWire flavor.");
+        }
+
+        commands::TransactionInfo* info =
+            transactionInfo->getTransactionInfo();
+
+        info->setType( (int)TRANSACTION_ROLLBACK );
+
+        transport->oneway( info );
     }
     catch( CommandIOException& ex ){
         transport->close();
@@ -1229,4 +1257,18 @@
     }
     AMQ_CATCH_RETHROW( ConnectorException )
     AMQ_CATCHALL_THROW( OpenWireConnectorException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+commands::TransactionId* OpenWireConnector::createLocalTransactionId()
+    throw ( ConnectorException ) {
+
+    commands::LocalTransactionId* id = new commands::LocalTransactionId();
+
+    id->setConnectionId(
+        dynamic_cast<commands::ConnectionId*>(
+            connectionInfo.getConnectionId()->cloneDataStructure() ) );
+    id->setValue( getNextTransactionId() );
+
+    return id;
 }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h?view=diff&rev=508802&r1=508801&r2=508802
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h
Sat Feb 17 12:04:36 2007
@@ -54,11 +54,12 @@
 #include <activemq/connector/openwire/OpenWireFormat.h>
 #include <activemq/connector/openwire/OpenWireFormatNegotiator.h>
 
+#include <activemq/connector/openwire/commands/ActiveMQTempDestination.h>
+#include <activemq/connector/openwire/commands/BrokerInfo.h>
 #include <activemq/connector/openwire/commands/ConnectionInfo.h>
 #include <activemq/connector/openwire/commands/ConsumerInfo.h>
-#include <activemq/connector/openwire/commands/BrokerInfo.h>
+#include <activemq/connector/openwire/commands/LocalTransactionId.h>
 #include <activemq/connector/openwire/commands/WireFormatInfo.h>
-#include <activemq/connector/openwire/commands/ActiveMQTempDestination.h>
 
 namespace activemq{
 namespace connector{
@@ -79,6 +80,19 @@
             CONNECTED
         };
 
+        // Enumeration of Transaction State flags
+        enum TransactionType
+        {
+            TRANSACTION_BEGIN = 0,
+            TRANSACTION_PREPARE = 1,
+            TRANSACTION_COMMITONEPHASE = 2,
+            TRANSACTION_COMMITTWOPHASE = 3,
+            TRANSACTION_ROLLBACK = 4,
+            TRANSACTION_RECOVER = 5,
+            TRANSACTION_FORGET = 6,
+            TRANSACTION_END = 7
+        };
+
     private:
 
         /**
@@ -612,7 +626,7 @@
          */
         std::string createTemporaryDestinationName()
             throw ( ConnectorException );
-            
+
         /**
          * Creates a commands::ConsumerInfo object.  Used for both standard
          * and durable consumers.
@@ -622,6 +636,14 @@
         commands::ConsumerInfo* createConsumerInfo(
             const cms::Destination* destination,
             connector::SessionInfo* session ) throw ( ConnectorException );
+
+        /**
+         * Create a Transaction Id using the local context to create
+         * the LocalTransactionId Command.
+         * @returns a new TransactionId pointer, caller owns.
+         */
+        commands::TransactionId* createLocalTransactionId()
+            throw ( ConnectorException );
 
     };
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireTransactionInfo.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireTransactionInfo.h?view=diff&rev=508802&r1=508801&r2=508802
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireTransactionInfo.h
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireTransactionInfo.h
Sat Feb 17 12:04:36 2007
@@ -43,7 +43,9 @@
             session = NULL;
         }
 
-        virtual ~OpenWireTransactionInfo() {}
+        virtual ~OpenWireTransactionInfo() {
+            delete transactionInfo;
+        }
 
         /**
          * Gets the Transction Id
@@ -102,6 +104,7 @@
          * @param transactionInfo - the TransactionInfo for this Session.
          */
         virtual void setTransactionInfo( commands::TransactionInfo* transactionInfo ) {
+            delete transactionInfo;
             this->transactionInfo = transactionInfo;
         }
 



Mime
View raw message