activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r508772 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire: OpenWireConnector.cpp OpenWireConnector.h
Date Sat, 17 Feb 2007 15:44:58 GMT
Author: tabish
Date: Sat Feb 17 07:44:58 2007
New Revision: 508772

URL: http://svn.apache.org/viewvc?view=rev&rev=508772
Log:
http://issues.apache.org/activemq/browse/AMQCPP-30

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp?view=diff&rev=508772&r1=508771&r2=508772
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
Sat Feb 17 07:44:58 2007
@@ -21,6 +21,7 @@
 #include <activemq/transport/Transport.h>
 #include <activemq/exceptions/UnsupportedOperationException.h>
 #include <activemq/util/Integer.h>
+#include <activemq/util/Long.h>
 #include <activemq/util/Guid.h>
 #include <activemq/connector/openwire/OpenWireConnectorException.h>
 #include <activemq/connector/openwire/OpenWireSessionInfo.h>
@@ -30,7 +31,10 @@
 #include <activemq/connector/openwire/BrokerException.h>
 #include <activemq/connector/openwire/OpenWireFormatFactory.h>
 
+#include <activemq/connector/openwire/commands/ActiveMQTempTopic.h>
+#include <activemq/connector/openwire/commands/ActiveMQTempQueue.h>
 #include <activemq/connector/openwire/commands/ConnectionId.h>
+#include <activemq/connector/openwire/commands/DestinationInfo.h>
 #include <activemq/connector/openwire/commands/RemoveInfo.h>
 #include <activemq/connector/openwire/commands/ShutdownInfo.h>
 #include <activemq/connector/openwire/commands/SessionInfo.h>
@@ -75,6 +79,7 @@
     this->nextProducerId = 1;
     this->nextTransactionId = 1;
     this->nextSessionId = 1;
+    this->nextTempDestinationId = 1;
     this->properties.copy( &properties );
     this->wireFormat = dynamic_cast<OpenWireFormat*>(
         wireFormatFactory.createWireFormat( properties ) );
@@ -154,6 +159,17 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+long long OpenWireConnector::getNextTempDestinationId()
+{
+    synchronized( &mutex )
+    {
+        return nextTempDestinationId++;
+    }
+
+    return 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////
 void OpenWireConnector::enforceConnected() throw ( ConnectorException )
 {
     if( state != CONNECTED )
@@ -473,15 +489,20 @@
 
 ////////////////////////////////////////////////////////////////////////////////
 cms::TemporaryTopic* OpenWireConnector::createTemporaryTopic(
-    connector::SessionInfo* session )
+    connector::SessionInfo* session AMQCPP_UNUSED )
         throw ( ConnectorException )
 {
     try
     {
         enforceConnected();
 
-        // TODO
-        return NULL;
+        commands::ActiveMQTempTopic* topic = new
+            commands::ActiveMQTempTopic( createTemporaryDestinationName() );
+
+        // Register it with the Broker
+        this->createTemporaryDestination( topic );
+
+        return topic;
     }
     AMQ_CATCH_RETHROW( ConnectorException )
     AMQ_CATCHALL_THROW( OpenWireConnectorException )
@@ -489,15 +510,20 @@
 
 ////////////////////////////////////////////////////////////////////////////////
 cms::TemporaryQueue* OpenWireConnector::createTemporaryQueue(
-    connector::SessionInfo* session )
+    connector::SessionInfo* session AMQCPP_UNUSED )
         throw ( ConnectorException )
 {
     try
     {
         enforceConnected();
 
-        // TODO
-        return NULL;
+        commands::ActiveMQTempQueue* queue = new
+            commands::ActiveMQTempQueue( createTemporaryDestinationName() );
+
+        // Register it with the Broker
+        this->createTemporaryDestination( queue );
+
+        return queue;
     }
     AMQ_CATCH_RETHROW( ConnectorException )
     AMQ_CATCHALL_THROW( OpenWireConnectorException )
@@ -807,11 +833,17 @@
             dynamic_cast<OpenWireConsumerInfo*>(resource);
         OpenWireSessionInfo* session =
             dynamic_cast<OpenWireSessionInfo*>(resource);
+        commands::ActiveMQTempDestination* tempDestination =
+            dynamic_cast<commands::ActiveMQTempDestination*>(resource);
 
         if( consumer != NULL ) {
             dataStructure = consumer->getConsumerInfo();
         } else if( session != NULL ) {
             dataStructure = session->getSessionInfo();
+        } else if( tempDestination != NULL ) {
+            // User deletes these
+            destroyTemporaryDestination( tempDestination );
+            return;
         }
 
         if( dataStructure == NULL ) {
@@ -969,3 +1001,56 @@
     AMQ_CATCHALL_THROW( OpenWireConnectorException )
 }
 
+////////////////////////////////////////////////////////////////////////////////
+void OpenWireConnector::createTemporaryDestination(
+    commands::ActiveMQTempDestination* tempDestination ) throw ( ConnectorException ) {
+
+    try {
+
+        commands::DestinationInfo command;
+        command.setConnectionId(
+            dynamic_cast<commands::ConnectionId*>(
+                connectionInfo.getConnectionId()->cloneDataStructure() ) );
+        command.setOperationType( 0 ); // 0 is add
+        command.setDestination(
+            dynamic_cast<commands::ActiveMQDestination*>(
+                tempDestination->cloneDataStructure() ) );
+
+        this->syncRequest( &command );
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( OpenWireConnectorException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenWireConnector::destroyTemporaryDestination(
+    commands::ActiveMQTempDestination* tempDestination ) throw ( ConnectorException ) {
+
+    try {
+
+        commands::DestinationInfo command;
+        command.setConnectionId(
+            dynamic_cast<commands::ConnectionId*>(
+                connectionInfo.getConnectionId()->cloneDataStructure() ) );
+        command.setOperationType( 1 ); // 1 is remove
+        command.setDestination(
+            dynamic_cast<commands::ActiveMQDestination*>(
+                tempDestination->cloneDataStructure() ) );
+
+        this->syncRequest( &command );
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( OpenWireConnectorException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+std::string OpenWireConnector::createTemporaryDestinationName()
+    throw ( ConnectorException )
+{
+    try {
+        return connectionInfo.getConnectionId()->getValue() + ":" +
+               util::Long::toString( getNextTempDestinationId() );
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( OpenWireConnectorException )
+}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h?view=diff&rev=508772&r1=508771&r2=508772
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h
Sat Feb 17 07:44:58 2007
@@ -57,6 +57,7 @@
 #include <activemq/connector/openwire/commands/ConnectionInfo.h>
 #include <activemq/connector/openwire/commands/BrokerInfo.h>
 #include <activemq/connector/openwire/commands/WireFormatInfo.h>
+#include <activemq/connector/openwire/commands/ActiveMQTempDestination.h>
 
 namespace activemq{
 namespace connector{
@@ -157,6 +158,11 @@
         long long nextSessionId;
 
         /**
+         * Next Temporary Destination Id
+         */
+        long long nextTempDestinationId;
+
+        /**
          * Properties for the connector.
          */
         util::SimpleProperties properties;
@@ -543,6 +549,7 @@
         long long getNextProducerId();
         long long getNextTransactionId();
         long long getNextSessionId();
+        long long getNextTempDestinationId();
 
         // Check for Connected State and Throw an exception if not.
         void enforceConnected() throw ( ConnectorException );
@@ -563,7 +570,8 @@
          * @throws ConnectorException thrown if an error response was received
          * from the broker, or if any other error occurred.
          */
-        transport::Response* syncRequest(transport::Command* command) throw (ConnectorException);
+        transport::Response* syncRequest( transport::Command* command )
+            throw (ConnectorException);
 
         /**
          * Sends a message to the broker to dispose of the given resource.
@@ -571,7 +579,34 @@
          * @throw ConnectorException if any problems occur from sending
          * the message.
          */
-        void disposeOf(commands::DataStructure* objectId) throw (ConnectorException);
+        void disposeOf( commands::DataStructure* objectId )
+            throw ( ConnectorException );
+
+        /**
+         * Send the Destination Creation Request to the Broker, alerting it
+         * that we've created a new Temporary Destination.
+         * @param tempDestination - The new Temporary Destination
+         */
+        void createTemporaryDestination(
+            commands::ActiveMQTempDestination* tempDestination )
+                throw ( ConnectorException );
+
+        /**
+         * Send the Destination Destruction Request to the Broker, alerting
+         * it that we've removed an existing Temporary Destination.
+         * @param tempDestination - The Temporary Destination to remove
+         */
+        void destroyTemporaryDestination(
+            commands::ActiveMQTempDestination* tempDestination )
+                throw ( ConnectorException );
+
+        /**
+         * Creates a new Temporary Destination name using the connection id
+         * and a rolling count.
+         * @returns a unique Temporary Destination name
+         */
+        std::string createTemporaryDestinationName()
+            throw ( ConnectorException );
 
     };
 



Mime
View raw message