activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nmitt...@apache.org
Subject svn commit: r507795 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main: Makefile.am activemq/connector/openwire/BrokerException.h activemq/connector/openwire/OpenWireConnector.cpp activemq/connector/openwire/OpenWireConnector.h
Date Thu, 15 Feb 2007 03:05:25 GMT
Author: nmittler
Date: Wed Feb 14 19:05:24 2007
New Revision: 507795

URL: http://svn.apache.org/viewvc?view=rev&rev=507795
Log:
[AMQCPP-30] starting to add session functionality to openwire connector

Added:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/BrokerException.h
Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am?view=diff&rev=507795&r1=507794&r2=507795
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am Wed Feb 14 19:05:24 2007
@@ -212,6 +212,7 @@
     activemq/connector/openwire/OpenWireProducerInfo.h \
     activemq/connector/openwire/OpenWireSessionInfo.h \
     activemq/connector/openwire/OpenWireTransactionInfo.h \
+    activemq/connector/openwire/BrokerException.h \
     activemq/connector/openwire/marshal/BaseDataStreamMarshaller.h \
     activemq/connector/openwire/utils/HexTable.h \
     activemq/connector/openwire/utils/BooleanStream.h \

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/BrokerException.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/BrokerException.h?view=auto&rev=507795
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/BrokerException.h
(added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/BrokerException.h
Wed Feb 14 19:05:24 2007
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CONNECTOR_OPENWIRE_BROKEREXCEPTION_H_
+#define _ACTIVEMQ_CONNECTOR_OPENWIRE_BROKEREXCEPTION_H_
+
+#include <activemq/connector/openwire/OpenWireConnectorException.h>
+#include <activemq/connector/openwire/commands/BrokerError.h>
+#include <sstream>
+
+namespace activemq{
+namespace connector{
+namespace openwire{
+    
+    class BrokerException : public OpenWireConnectorException {
+        
+        BrokerException() throw(){}
+        BrokerException( const exceptions::ActiveMQException& ex ) throw()
+            : OpenWireConnectorException(){
+            *( exceptions::ActiveMQException* )this = ex;
+        }
+        BrokerException( const BrokerException& ex ) throw()
+            : OpenWireConnectorException(){
+            *( exceptions::ActiveMQException* )this = ex;
+        }
+        BrokerException( const char* file,
+                                    const int lineNumber,
+                                    const char* msg, ... ) throw()
+            : OpenWireConnectorException()
+        {
+            va_list vargs;
+            va_start( vargs, msg );
+            buildMessage( msg, vargs );
+
+            // Set the first mark for this exception.
+            setMark( file, lineNumber );
+        }
+        
+        BrokerException( const char* file, 
+                         const int lineNumber, 
+                         const commands::BrokerError* error ) throw()
+            : OpenWireConnectorException()
+        {
+            std::ostringstream ostream;
+            ostream << "*** BEGIN SERVER-SIDE STACK TRACE ***" << std::endl;
+            ostream << "Message: " << error->getMessage() << std::endl;
+            ostream << "Cause: " << error->getCause() << std::endl;
+            ostream << "Exception Class " << error->getExceptionClass() <<
std::endl;
+            
+            const std::vector<commands::BrokerError::StackTraceElement*>& trace
= error->getStackTraceElements();
+            for( std::size_t ix=0; ix<trace.size(); ++ix ){
+                commands::BrokerError::StackTraceElement* element = trace[ix];
+                ostream << "\t[FILE: " << element->FileName << ", LINE:
" << element->LineNumber
+                    << "] occurred in: " << element->ClassName << "."
<< element->MethodName << std::endl;
+            }
+            
+            ostream << "*** END SERVER-SIDE STACK TRACE ***";
+            
+            setMessage( ostream.str().c_str() );            
+            setMark( file, lineNumber );            
+        }
+
+        /**
+         * Clones this exception.  This is useful for cases where you need
+         * to preserve the type of the original exception as well as the message.
+         * All subclasses should override.
+         */
+        virtual exceptions::ActiveMQException* clone() const{
+            return new BrokerException( *this );
+        }
+
+        virtual ~BrokerException() throw() {}
+    };
+    
+}}}
+
+#endif /*_ACTIVEMQ_CONNECTOR_OPENWIRE_BROKEREXCEPTION_H_*/

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp?view=diff&rev=507795&r1=507794&r2=507795
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
Wed Feb 14 19:05:24 2007
@@ -25,6 +25,7 @@
 #include <activemq/util/Integer.h>
 #include <activemq/util/Guid.h>
 #include <activemq/connector/openwire/OpenWireConnectorException.h>
+#include <activemq/connector/openwire/BrokerException.h>
 #include <activemq/connector/openwire/OpenWireFormatFactory.h>
 
 #include <activemq/connector/openwire/commands/ConnectionId.h>
@@ -34,6 +35,7 @@
 #include <activemq/connector/openwire/commands/MessageDispatch.h>
 #include <activemq/connector/openwire/commands/WireFormatInfo.h>
 #include <activemq/connector/openwire/commands/BrokerInfo.h>
+#include <activemq/connector/openwire/commands/BrokerError.h>
 
 using namespace std;
 using namespace activemq;
@@ -66,6 +68,7 @@
     this->brokerWireFormatInfo = NULL;
     this->nextProducerId = 1;
     this->nextTransactionId = 1;
+    this->nextSessionId = 1;
     this->properties.copy( &properties );
     this->wireFormat = dynamic_cast<OpenWireFormat*>(
         wireFormatFactory.createWireFormat( properties ) );
@@ -101,7 +104,7 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-unsigned int OpenWireConnector::getNextProducerId()
+long long OpenWireConnector::getNextProducerId()
 {
     synchronized( &mutex )
     {
@@ -112,7 +115,7 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-unsigned int OpenWireConnector::getNextTransactionId()
+long long OpenWireConnector::getNextTransactionId()
 {
     synchronized( &mutex )
     {
@@ -123,6 +126,17 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+long long OpenWireConnector::getNextSessionId()
+{
+    synchronized( &mutex )
+    {
+        return nextSessionId++;
+    }
+
+    return 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////
 void OpenWireConnector::enforceConnected() throw ( ConnectorException )
 {
     if( state != CONNECTED )
@@ -155,7 +169,7 @@
         }
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException );
+    AMQ_CATCHALL_THROW( ActiveMQException )
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -176,7 +190,7 @@
         }
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException );
+    AMQ_CATCHALL_THROW( ActiveMQException )
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -205,17 +219,7 @@
         connectionInfo.setConnectionId( connectionId );
 
         // Now we ping the broker and see if we get an ack / nack
-        Response* response = transport->request( &connectionInfo );
-
-        if( dynamic_cast<ExceptionResponse*>( response ) != NULL )
-        {
-            delete response;
-
-            throw OpenWireConnectorException(
-                __FILE__,
-                __LINE__,
-                "OpenWireConnector::connect - Failed on Connect Request" );
-        }
+        Response* response = syncRequest( &connectionInfo );
 
         // Tag us in the Connected State now.
         state = CONNECTED;
@@ -223,7 +227,7 @@
         // Clean up the ack
         delete response;
     }
-    AMQ_CATCH_RETHROW( BrokerError )
+    AMQ_CATCH_RETHROW( ConnectorException )
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCHALL_THROW( ActiveMQException )
 }
@@ -237,20 +241,18 @@
         state = DISCONNECTED;
 
         // Remove our ConnectionId from the Broker
-        RemoveInfo remove;
-        remove.setObjectId( connectionInfo.getConnectionId()->cloneDataStructure() );
-        transport->oneway( &remove );
+        disposeOf( connectionInfo.getConnectionId() );
 
         // Send the disconnect command to the broker.
         ShutdownInfo shutdown;
-        transport->oneway( &shutdown );
+        oneway( &shutdown );
 
-    } catch( CommandIOException& ex ){
+    } catch( ConnectorException& ex ){
         transport->close();
         throw ex;
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException );
+    AMQ_CATCHALL_THROW( ActiveMQException )
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -262,11 +264,36 @@
     {
         enforceConnected();
 
-        // TODO
-        return NULL;
+        // Create and initialize a new SessionInfo object
+        commands::SessionInfo* info = new commands::SessionInfo();
+        SessionId* sessionId = new SessionId();
+        sessionId->setConnectionId( connectionInfo.getConnectionId()->getValue() );
+        sessionId->setValue( getNextSessionId() );
+        info->setSessionId( sessionId );
+            
+        try{
+            
+            // Send the subscription message to the broker.
+            Response* response = syncRequest(info);
+            
+            // The broker did not return an error - this is good.
+            // Just discard the response.
+            delete response;
+            
+            // Return the session info.
+            return NULL; /* TODO: Find a way to bridge between commands::SessionInfo and
connector::SessionInfo */
+            
+        } catch( ConnectorException& ex ) {
+            
+            // Something bad happened - free the session info object.
+            delete info;
+            
+            ex.setMark(__FILE__, __LINE__);
+            throw ex;
+        }        
     }
     AMQ_CATCH_RETHROW( ConnectorException )
-    AMQ_CATCHALL_THROW( ConnectorException );
+    AMQ_CATCHALL_THROW( ConnectorException )
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -285,7 +312,7 @@
         return NULL;
     }
     AMQ_CATCH_RETHROW( ConnectorException )
-    AMQ_CATCHALL_THROW( ConnectorException );
+    AMQ_CATCHALL_THROW( ConnectorException )
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -305,7 +332,7 @@
         return NULL;
     }
     AMQ_CATCH_RETHROW( ConnectorException )
-    AMQ_CATCHALL_THROW( ConnectorException );
+    AMQ_CATCHALL_THROW( ConnectorException )
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -322,7 +349,7 @@
         return NULL;
     }
     AMQ_CATCH_RETHROW( ConnectorException )
-    AMQ_CATCHALL_THROW( ConnectorException );
+    AMQ_CATCHALL_THROW( ConnectorException )
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -338,7 +365,7 @@
         return NULL;
     }
     AMQ_CATCH_RETHROW( ConnectorException )
-    AMQ_CATCHALL_THROW( ConnectorException );
+    AMQ_CATCHALL_THROW( ConnectorException )
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -354,7 +381,7 @@
         return NULL;
     }
     AMQ_CATCH_RETHROW( ConnectorException )
-    AMQ_CATCHALL_THROW( ConnectorException );
+    AMQ_CATCHALL_THROW( ConnectorException )
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -370,7 +397,7 @@
         return NULL;
     }
     AMQ_CATCH_RETHROW( ConnectorException )
-    AMQ_CATCHALL_THROW( ConnectorException );
+    AMQ_CATCHALL_THROW( ConnectorException )
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -386,7 +413,7 @@
         return NULL;
     }
     AMQ_CATCH_RETHROW( ConnectorException )
-    AMQ_CATCHALL_THROW( ConnectorException );
+    AMQ_CATCHALL_THROW( ConnectorException )
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -438,7 +465,7 @@
             ex.what() );
     }
     AMQ_CATCH_RETHROW( ConnectorException )
-    AMQ_CATCHALL_THROW( ConnectorException );
+    AMQ_CATCHALL_THROW( ConnectorException )
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -458,7 +485,7 @@
         }
     }
     AMQ_CATCH_RETHROW( ConnectorException )
-    AMQ_CATCHALL_THROW( ConnectorException );
+    AMQ_CATCHALL_THROW( ConnectorException )
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -502,7 +529,7 @@
             ex.what() );
     }
     AMQ_CATCH_RETHROW( ConnectorException )
-    AMQ_CATCHALL_THROW( ConnectorException );
+    AMQ_CATCHALL_THROW( ConnectorException )
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -537,7 +564,7 @@
             ex.what() );
     }
     AMQ_CATCH_RETHROW( ConnectorException )
-    AMQ_CATCHALL_THROW( ConnectorException );
+    AMQ_CATCHALL_THROW( ConnectorException )
     return NULL;
 }
 
@@ -563,7 +590,7 @@
             ex.what() );
     }
     AMQ_CATCH_RETHROW( ConnectorException )
-    AMQ_CATCHALL_THROW( ConnectorException );
+    AMQ_CATCHALL_THROW( ConnectorException )
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -588,7 +615,7 @@
             ex.what() );
     }
     AMQ_CATCH_RETHROW( ConnectorException )
-    AMQ_CATCHALL_THROW( ConnectorException );
+    AMQ_CATCHALL_THROW( ConnectorException )
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -606,7 +633,7 @@
         return NULL;
     }
     AMQ_CATCH_RETHROW( ConnectorException )
-    AMQ_CATCHALL_THROW( ConnectorException );
+    AMQ_CATCHALL_THROW( ConnectorException )
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -624,7 +651,7 @@
         return NULL;
     }
     AMQ_CATCH_RETHROW( ConnectorException )
-    AMQ_CATCHALL_THROW( ConnectorException );
+    AMQ_CATCHALL_THROW( ConnectorException )
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -642,7 +669,7 @@
         return NULL;
     }
     AMQ_CATCH_RETHROW( ConnectorException )
-    AMQ_CATCHALL_THROW( ConnectorException );
+    AMQ_CATCHALL_THROW( ConnectorException )
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -660,7 +687,7 @@
         return NULL;
     }
     AMQ_CATCH_RETHROW( ConnectorException )
-    AMQ_CATCHALL_THROW( ConnectorException );
+    AMQ_CATCHALL_THROW( ConnectorException )
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -671,7 +698,7 @@
     {
     }
     AMQ_CATCH_RETHROW( ConnectorException )
-    AMQ_CATCHALL_THROW( ConnectorException );
+    AMQ_CATCHALL_THROW( ConnectorException )
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -691,7 +718,7 @@
         delete resource;
     }
     AMQ_CATCH_RETHROW( ConnectorException )
-    AMQ_CATCHALL_THROW( ConnectorException );
+    AMQ_CATCHALL_THROW( ConnectorException )
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -747,7 +774,7 @@
         }
     }
     AMQ_CATCH_RETHROW( ConnectorException )
-    AMQ_CATCHALL_THROW( ConnectorException );
+    AMQ_CATCHALL_THROW( ConnectorException )
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -769,5 +796,65 @@
         fire( ex );
     }
     AMQ_CATCH_RETHROW( ConnectorException )
-    AMQ_CATCHALL_THROW( ConnectorException );
+    AMQ_CATCHALL_THROW( ConnectorException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenWireConnector::oneway(Command* command) throw (ConnectorException)
+{
+    try
+    {
+        enforceConnected();
+        transport->oneway(command);
+    }
+    AMQ_CATCH_EXCEPTION_CONVERT( CommandIOException, OpenWireConnectorException )
+    AMQ_CATCH_EXCEPTION_CONVERT( UnsupportedOperationException, OpenWireConnectorException
)
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( OpenWireConnectorException )
 }
+
+////////////////////////////////////////////////////////////////////////////////
+Response* OpenWireConnector::syncRequest(Command* command) throw (ConnectorException)
+{
+    try
+    {
+        enforceConnected();
+        Response* response = transport->request(command);
+        ExceptionResponse* exceptionResponse = dynamic_cast<ExceptionResponse*>(response);
+        if( exceptionResponse != NULL )
+        {   
+            // Create an exception to hold the error information.
+            /*commands::BrokerError* brokerError = dynamic_cast<commands::BrokerError*>(exceptionResponse->getException());
+            BrokerException exception( __FILE__, __LINE__, brokerError );*/
+            /* TODO: Bridge between transport::BrokerError and openwire::commands::BrokerError
*/
+            OpenWireConnectorException exception( __FILE__, __LINE__, "An error occurred
at the broker" );
+            
+            // Free the response command.
+            delete response;
+            
+            // Throw the exception.
+            throw exception;
+        }
+        
+        // Nothing bad happened - just return the response.
+        return response;
+    }
+    AMQ_CATCH_EXCEPTION_CONVERT( CommandIOException, OpenWireConnectorException )
+    AMQ_CATCH_EXCEPTION_CONVERT( UnsupportedOperationException, OpenWireConnectorException
)
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( OpenWireConnectorException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenWireConnector::disposeOf(DataStructure* objectId) throw (ConnectorException)
+{
+    try
+    {
+        RemoveInfo command;
+        command.setObjectId( objectId->cloneDataStructure() );
+        oneway(&command);
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( OpenWireConnectorException )
+}
+

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h?view=diff&rev=507795&r1=507794&r2=507795
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h
Wed Feb 14 19:05:24 2007
@@ -139,12 +139,17 @@
         /**
          * Next avaliable Producer Id
          */
-        unsigned int nextProducerId;
+        long long nextProducerId;
 
         /**
          * Next avaliable Transaction Id
          */
-        unsigned int nextTransactionId;
+        long long nextTransactionId;
+        
+        /**
+         * Next available Session Id.
+         */
+        long long nextSessionId;
 
         /**
          * Properties for the connector.
@@ -529,11 +534,38 @@
 
     private:
 
-        unsigned int getNextProducerId(void);
-        unsigned int getNextTransactionId(void);
+        long long getNextProducerId();
+        long long getNextTransactionId();
+        long long getNextSessionId();
 
         // Check for Connected State and Throw an exception if not.
-        void enforceConnected( void ) throw ( ConnectorException );
+        void enforceConnected() throw ( ConnectorException );
+        
+        /**
+         * Sends a oneway message.
+         * @param command The message to send.
+         * @throws ConnectorException if not currently connected, or
+         * if the operation fails for any reason.
+         */
+        void oneway( transport::Command* command ) throw (ConnectorException);
+        
+        /**
+         * Sends a synchronous request and returns the response from the broker.
+         * Converts any error responses into an exception.
+         * @param command The request command.
+         * @returns The response sent from the broker.
+         * @throws ConnectorException thrown if an error response was received
+         * from the broker, or if any other error occurred.
+         */         
+        transport::Response* syncRequest(transport::Command* command) throw (ConnectorException);
+        
+        /**
+         * Sends a message to the broker to dispose of the given resource.
+         * @param objectId The ID of the resource to be released.
+         * @throw ConnectorException if any problems occur from sending
+         * the message.
+         */
+        void disposeOf(commands::DataStructure* objectId) throw (ConnectorException);
 
     };
 



Mime
View raw message