activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r736842 [1/5] - in /activemq/activemq-cpp/trunk/src: main/ main/activemq/commands/ main/activemq/connector/ main/activemq/core/ main/activemq/exceptions/ main/activemq/library/ main/activemq/transport/ main/activemq/transport/mock/ main/act...
Date Thu, 22 Jan 2009 22:55:28 GMT
Author: tabish
Date: Thu Jan 22 14:55:27 2009
New Revision: 736842

URL: http://svn.apache.org/viewvc?rev=736842&view=rev
Log:
Large refactoring of the Core package and removal of the connector stuff from all code.  The code will segfault occasionally but the initial structure of the new design is working now.  

Test have been modified and tests that use stomp are disabled for now as stomp support is not working.

Added:
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQTransactionContext.cpp
      - copied, changed from r732499, activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQTransaction.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQTransactionContext.h
      - copied, changed from r732499, activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQTransaction.h
    activemq/activemq-cpp/trunk/src/main/activemq/core/Synchronization.h   (with props)
    activemq/activemq-cpp/trunk/src/main/activemq/exceptions/BrokerException.h   (contents, props changed)
      - copied, changed from r733509, activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/BrokerException.h
    activemq/activemq-cpp/trunk/src/main/activemq/util/Usage.h   (with props)
Removed:
    activemq/activemq-cpp/trunk/src/main/activemq/connector/
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnectionData.h
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQTransaction.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQTransaction.h
    activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/BrokerException.h
    activemq/activemq-cpp/trunk/src/test/activemq/connector/
Modified:
    activemq/activemq-cpp/trunk/src/main/Makefile.am
    activemq/activemq-cpp/trunk/src/main/activemq/commands/ActiveMQTempDestination.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/commands/ActiveMQTempDestination.h
    activemq/activemq-cpp/trunk/src/main/activemq/commands/ActiveMQTempQueue.h
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQAckHandler.h
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.h
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnectionFactory.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnectionSupport.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnectionSupport.h
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConstants.h
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.h
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQProducer.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQProducer.h
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.h
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSessionExecutor.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/core/DispatchData.h
    activemq/activemq-cpp/trunk/src/main/activemq/library/ActiveMQCPP.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/transport/AbstractTransportFactory.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.h
    activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransportFactory.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransport.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransport.h
    activemq/activemq-cpp/trunk/src/main/activemq/util/MemoryUsage.h
    activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormat.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormat.h
    activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireResponseBuilder.cpp
    activemq/activemq-cpp/trunk/src/test-integration/Makefile.am
    activemq/activemq-cpp/trunk/src/test-integration/TestRegistry.cpp
    activemq/activemq-cpp/trunk/src/test-integration/activemq/test/openwire/OpenwireAsyncSenderTest.cpp
    activemq/activemq-cpp/trunk/src/test/Makefile.am
    activemq/activemq-cpp/trunk/src/test/activemq/cmsutil/CmsTemplateTest.cpp
    activemq/activemq-cpp/trunk/src/test/activemq/cmsutil/DummyConnection.h
    activemq/activemq-cpp/trunk/src/test/activemq/cmsutil/DummySession.h
    activemq/activemq-cpp/trunk/src/test/activemq/cmsutil/DynamicDestinationResolverTest.h
    activemq/activemq-cpp/trunk/src/test/activemq/cmsutil/SessionPoolTest.h
    activemq/activemq-cpp/trunk/src/test/activemq/core/ActiveMQConnectionFactoryTest.cpp
    activemq/activemq-cpp/trunk/src/test/activemq/core/ActiveMQConnectionFactoryTest.h
    activemq/activemq-cpp/trunk/src/test/activemq/core/ActiveMQConnectionTest.cpp
    activemq/activemq-cpp/trunk/src/test/activemq/core/ActiveMQConnectionTest.h
    activemq/activemq-cpp/trunk/src/test/activemq/core/ActiveMQSessionTest.cpp
    activemq/activemq-cpp/trunk/src/test/activemq/core/ActiveMQSessionTest.h
    activemq/activemq-cpp/trunk/src/test/testRegistry.cpp

Modified: activemq/activemq-cpp/trunk/src/main/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/Makefile.am?rev=736842&r1=736841&r2=736842&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/src/main/Makefile.am Thu Jan 22 14:55:27 2009
@@ -35,17 +35,14 @@
     activemq/cmsutil/CmsTemplate.cpp \
     activemq/core/ActiveMQConsumer.cpp \
     activemq/core/ActiveMQConnection.cpp \
+    activemq/core/ActiveMQConnectionSupport.cpp \
     activemq/core/ActiveMQConnectionMetaData.cpp \
     activemq/core/ActiveMQSession.cpp \
     activemq/core/ActiveMQConstants.cpp \
-    activemq/core/ActiveMQTransaction.cpp \
+    activemq/core/ActiveMQTransactionContext.cpp \
     activemq/core/ActiveMQProducer.cpp \
     activemq/core/ActiveMQConnectionFactory.cpp \
     activemq/core/ActiveMQSessionExecutor.cpp \
-    activemq/connector/openwire/OpenWireConnector.cpp \
-    activemq/connector/openwire/OpenWireConnectorFactory.cpp \
-    activemq/connector/ConnectorFactoryMap.cpp \
-    activemq/connector/BaseConnectorResource.cpp \
     activemq/transport/TransportFilter.cpp \
     activemq/transport/TransportRegistry.cpp \
     activemq/transport/AbstractTransportFactory.cpp \
@@ -67,11 +64,6 @@
     activemq/wireformat/openwire/utils/OpenwireStringSupport.cpp \
     activemq/wireformat/openwire/utils/MessagePropertyInterceptor.cpp \
     activemq/wireformat/openwire/marshal/PrimitiveMapMarshaller.cpp \
-    activemq/wireformat/stomp/StompWireFormat.cpp \
-    activemq/wireformat/stomp/StompWireFormatFactory.cpp \
-    activemq/wireformat/stomp/StompResponseBuilder.cpp \
-    activemq/wireformat/stomp/marshal/Marshaler.cpp \
-    activemq/wireformat/stomp/commands/CommandConstants.cpp \
     decaf/internal/DecafRuntime.cpp \
     decaf/internal/AprPool.cpp \
     decaf/internal/util/ByteArrayAdapter.cpp \
@@ -143,22 +135,22 @@
     decaf/nio/FloatBuffer.cpp \
     decaf/nio/IntBuffer.cpp \
     decaf/nio/LongBuffer.cpp \
-    decaf/nio/ShortBuffer.cpp \
-    activemq/connector/stomp/StompSessionManager.cpp \
-    activemq/connector/stomp/commands/CommandConstants.cpp \
-    activemq/connector/stomp/StompConnectorFactory.cpp \
-    activemq/connector/stomp/StompConnector.cpp \
-    activemq/connector/stomp/StompSelector.cpp \
-    activemq/connector/stomp/StompConnectionNegotiator.cpp
+    decaf/nio/ShortBuffer.cpp
+
+#    activemq/wireformat/stomp/StompWireFormat.cpp \
+#    activemq/wireformat/stomp/StompWireFormatFactory.cpp \
+#    activemq/wireformat/stomp/StompResponseBuilder.cpp \
+#    activemq/wireformat/stomp/marshal/Marshaler.cpp \
+#    activemq/wireformat/stomp/commands/CommandConstants.cpp
 
 h_sources = \
     activemq/library/ActiveMQCPP.h \
     activemq/core/ActiveMQProducer.h \
     activemq/core/ActiveMQMessage.h \
-    activemq/core/ActiveMQConnectionData.h \
     activemq/core/ActiveMQConnection.h \
+    activemq/core/ActiveMQConnectionSupport.h \
     activemq/core/ActiveMQConnectionMetaData.h \
-    activemq/core/ActiveMQTransaction.h \
+    activemq/core/ActiveMQTransactionContext.h \
     activemq/core/ActiveMQConnectionFactory.h \
     activemq/core/ActiveMQConsumer.h \
     activemq/core/ActiveMQSession.h \
@@ -167,6 +159,7 @@
     activemq/core/ActiveMQSessionExecutor.h \
     activemq/core/DispatchData.h \
     activemq/core/Dispatcher.h \
+    activemq/core/Synchronization.h \
     activemq/cmsutil/CachedConsumer.h \
     activemq/cmsutil/CachedProducer.h \
     activemq/cmsutil/CmsAccessor.h \
@@ -183,24 +176,8 @@
     activemq/io/LoggingInputStream.h \
     activemq/io/LoggingOutputStream.h \
     activemq/exceptions/ActiveMQException.h \
+    activemq/exceptions/BrokerException.h \
     activemq/exceptions/ExceptionDefines.h \
-    activemq/connector/ConsumerMessageListener.h \
-    activemq/connector/ConnectorException.h \
-    activemq/connector/TransactionInfo.h \
-    activemq/connector/SessionInfo.h \
-    activemq/connector/ConnectorResource.h \
-    activemq/connector/ConnectorResourceListener.h \
-    activemq/connector/ConnectorFactory.h \
-    activemq/connector/BaseConnectorResource.h \
-    activemq/connector/openwire/OpenWireConsumerInfo.h \
-    activemq/connector/openwire/OpenWireProducerInfo.h \
-    activemq/connector/openwire/OpenWireSessionInfo.h \
-    activemq/connector/openwire/OpenWireTransactionInfo.h \
-    activemq/connector/ConnectorFactoryMap.h \
-    activemq/connector/Connector.h \
-    activemq/connector/ConnectorFactoryMapRegistrar.h \
-    activemq/connector/ProducerInfo.h \
-    activemq/connector/ConsumerInfo.h \
     activemq/transport/Transport.h \
     activemq/transport/TransportFilter.h \
     activemq/transport/TransportFactory.h \
@@ -227,22 +204,14 @@
     activemq/util/PrimitiveList.h \
     activemq/util/URISupport.h \
     activemq/util/MemoryUsage.h \
+    activemq/util/Usage.h \
     activemq/wireformat/WireFormat.h \
     activemq/wireformat/WireFormatNegotiator.h \
     activemq/wireformat/WireFormatFactory.h \
     activemq/wireformat/WireFormatRegistry.h \
-    activemq/wireformat/stomp/StompWireFormat.h \
-    activemq/wireformat/stomp/StompWireFormatFactory.h \
-    activemq/wireformat/stomp/StompResponseBuilder.h \
-    activemq/wireformat/stomp/StompFrame.h \
-    activemq/wireformat/stomp/marshal/Marshalable.h \
-    activemq/wireformat/stomp/marshal/MarshalException.h \
-    activemq/wireformat/stomp/marshal/Marshaler.h \
-    activemq/wireformat/stomp/commands/CommandConstants.h \
     activemq/wireformat/openwire/OpenWireFormat.h \
     activemq/wireformat/openwire/OpenWireFormatNegotiator.h \
     activemq/wireformat/openwire/OpenWireResponseBuilder.h \
-    activemq/wireformat/openwire/BrokerException.h \
     activemq/wireformat/openwire/marshal/BaseDataStreamMarshaller.h \
     activemq/wireformat/openwire/utils/HexTable.h \
     activemq/wireformat/openwire/utils/BooleanStream.h \
@@ -430,39 +399,16 @@
     decaf/nio/BufferOverflowException.h \
     decaf/nio/BufferUnderflowException.h \
     decaf/nio/InvalidMarkException.h \
-    decaf/nio/ReadOnlyBufferException.h \
-    activemq/connector/stomp/StompSessionInfo.h \
-    activemq/connector/stomp/StompTransactionInfo.h \
-    activemq/connector/stomp/StompSessionManager.h \
-    activemq/connector/stomp/StompQueue.h \
-    activemq/connector/stomp/StompConnectorFactory.h \
-    activemq/connector/stomp/commands/DisconnectCommand.h \
-    activemq/connector/stomp/commands/AbortCommand.h \
-    activemq/connector/stomp/commands/ErrorCommand.h \
-    activemq/connector/stomp/commands/CommandConstants.h \
-    activemq/connector/stomp/commands/UnsubscribeCommand.h \
-    activemq/connector/stomp/commands/AbstractCommand.h \
-    activemq/connector/stomp/commands/StompMessage.h \
-    activemq/connector/stomp/commands/AckCommand.h \
-    activemq/connector/stomp/commands/StompCommand.h \
-    activemq/connector/stomp/commands/BytesMessageCommand.h \
-    activemq/connector/stomp/commands/ReceiptCommand.h \
-    activemq/connector/stomp/commands/SubscribeCommand.h \
-    activemq/connector/stomp/commands/BeginCommand.h \
-    activemq/connector/stomp/commands/TextMessageCommand.h \
-    activemq/connector/stomp/commands/ConnectCommand.h \
-    activemq/connector/stomp/commands/ConnectedCommand.h \
-    activemq/connector/stomp/commands/MessageCommand.h \
-    activemq/connector/stomp/commands/CommitCommand.h \
-    activemq/connector/stomp/StompCommandListener.h \
-    activemq/connector/stomp/StompConnectorException.h \
-    activemq/connector/stomp/StompConnectionNegotiator.h \
-    activemq/connector/stomp/StompProducerInfo.h \
-    activemq/connector/stomp/StompSelector.h \
-    activemq/connector/stomp/StompConsumerInfo.h \
-    activemq/connector/stomp/StompConnector.h \
-    activemq/connector/stomp/StompDestination.h \
-    activemq/connector/stomp/StompTopic.h
+    decaf/nio/ReadOnlyBufferException.h
+
+#    activemq/wireformat/stomp/StompWireFormat.h \
+#    activemq/wireformat/stomp/StompWireFormatFactory.h \
+#    activemq/wireformat/stomp/StompResponseBuilder.h \
+#    activemq/wireformat/stomp/StompFrame.h \
+#    activemq/wireformat/stomp/marshal/Marshalable.h \
+#    activemq/wireformat/stomp/marshal/MarshalException.h \
+#    activemq/wireformat/stomp/marshal/Marshaler.h \
+#    activemq/wireformat/stomp/commands/CommandConstants.h
 
 include activemq/commands/srcmakefile.mk
 include activemq/wireformat/openwire/marshal/v1/srcmakefile.mk

Modified: activemq/activemq-cpp/trunk/src/main/activemq/commands/ActiveMQTempDestination.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/commands/ActiveMQTempDestination.cpp?rev=736842&r1=736841&r2=736842&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/commands/ActiveMQTempDestination.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/commands/ActiveMQTempDestination.cpp Thu Jan 22 14:55:27 2009
@@ -21,19 +21,16 @@
 using namespace std;
 using namespace activemq;
 using namespace activemq::exceptions;
-using namespace activemq::connector;
 using namespace activemq::commands;
 
 ////////////////////////////////////////////////////////////////////////////////
 ActiveMQTempDestination::ActiveMQTempDestination() :
-    ActiveMQDestination(),
-    connector( NULL ) {
+    ActiveMQDestination() {
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 ActiveMQTempDestination::ActiveMQTempDestination( const std::string& name ) :
-    ActiveMQDestination( name ),
-    connector( NULL ) {
+    ActiveMQDestination( name ) {
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -44,3 +41,12 @@
 unsigned char ActiveMQTempDestination::getDataStructureType() const {
     return ActiveMQTempDestination::ID_ACTIVEMQTEMPDESTINATION;
 }
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQTempDestination::close() throw( cms::CMSException ) {
+    try {
+        // TODO - Dispose of this Temp Dest.
+    }
+    AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
+    AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
+}

Modified: activemq/activemq-cpp/trunk/src/main/activemq/commands/ActiveMQTempDestination.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/commands/ActiveMQTempDestination.h?rev=736842&r1=736841&r2=736842&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/commands/ActiveMQTempDestination.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/commands/ActiveMQTempDestination.h Thu Jan 22 14:55:27 2009
@@ -25,8 +25,6 @@
 
 #include <activemq/util/Config.h>
 #include <activemq/commands/ActiveMQDestination.h>
-#include <activemq/connector/Connector.h>
-#include <activemq/connector/BaseConnectorResource.h>
 #include <cms/Closeable.h>
 #include <vector>
 #include <string>
@@ -35,14 +33,14 @@
 namespace commands{
 
     class AMQCPP_API ActiveMQTempDestination : public ActiveMQDestination,
-                                               public connector::BaseConnectorResource {
+                                               public cms::Closeable {
     protected:
 
         /**
          * Connector that we call back on close to allow this resource to
          * be cleaned up correctly at this end and at the Broker End.
          */
-        connector::Connector* connector;
+        // TODO - Add something to ask for a way to send a dispose
 
     public:
 
@@ -99,6 +97,13 @@
             return ActiveMQDestination::equals( value );
         }
 
+        /**
+         * Closes down this Destination resulting in a call to dispose of the
+         * TempDestination resource at the Broker.
+         * throws cms::CMSException
+         */
+        virtual void close() throw( cms::CMSException );
+
     };
 
 }}

Modified: activemq/activemq-cpp/trunk/src/main/activemq/commands/ActiveMQTempQueue.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/commands/ActiveMQTempQueue.h?rev=736842&r1=736841&r2=736842&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/commands/ActiveMQTempQueue.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/commands/ActiveMQTempQueue.h Thu Jan 22 14:55:27 2009
@@ -70,7 +70,7 @@
          * Converts the Destination Name into a String
          * @return string name
          */
-        virtual std::string toString(void) const {
+        virtual std::string toString() const {
             std::ostringstream stream;
 
             stream << "Begin Class = ActiveMQTempQueue" << std::endl;

Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQAckHandler.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQAckHandler.h?rev=736842&r1=736841&r2=736842&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQAckHandler.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQAckHandler.h Thu Jan 22 14:55:27 2009
@@ -36,7 +36,7 @@
 
         /**
          * Method called to acknowledge the message passed
-         * @param message Message to Acknowlegde
+         * @param message Message to Acknowledge
          * @throw CMSException
          */
         virtual void acknowledgeMessage( const ActiveMQMessage* message )

Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp?rev=736842&r1=736841&r2=736842&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp Thu Jan 22 14:55:27 2009
@@ -18,60 +18,92 @@
 #include "ActiveMQConnection.h"
 
 #include <cms/Session.h>
+
 #include <activemq/core/ActiveMQSession.h>
-#include <activemq/core/ActiveMQConsumer.h>
+#include <activemq/core/ActiveMQProducer.h>
+#include <activemq/core/ActiveMQConstants.h>
+#include <activemq/transport/Response.h>
+#include <activemq/exceptions/BrokerException.h>
+
 #include <decaf/lang/Boolean.h>
 #include <decaf/util/Iterator.h>
+#include <decaf/util/UUID.h>
+
+#include <activemq/commands/ActiveMQMessage.h>
+#include <activemq/commands/BrokerInfo.h>
+#include <activemq/commands/BrokerError.h>
+#include <activemq/commands/ConnectionId.h>
+#include <activemq/commands/DestinationInfo.h>
+#include <activemq/commands/ExceptionResponse.h>
+#include <activemq/commands/KeepAliveInfo.h>
+#include <activemq/commands/Message.h>
+#include <activemq/commands/MessagePull.h>
+#include <activemq/commands/MessageAck.h>
+#include <activemq/commands/MessageDispatch.h>
+#include <activemq/commands/ProducerAck.h>
+#include <activemq/commands/ProducerInfo.h>
+#include <activemq/commands/RemoveInfo.h>
+#include <activemq/commands/ShutdownInfo.h>
+#include <activemq/commands/SessionInfo.h>
+#include <activemq/commands/WireFormatInfo.h>
+#include <activemq/commands/RemoveSubscriptionInfo.h>
 
 using namespace std;
 using namespace cms;
 using namespace activemq;
 using namespace activemq::core;
-using namespace activemq::connector;
 using namespace activemq::exceptions;
+using namespace decaf;
 using namespace decaf::util;
 using namespace decaf::lang;
 using namespace decaf::lang::exceptions;
 
 ////////////////////////////////////////////////////////////////////////////////
-ActiveMQConnection::ActiveMQConnection(ActiveMQConnectionData* connectionData) {
-    this->connectionData = connectionData;
+ActiveMQConnection::ActiveMQConnection( transport::Transport* transport,
+                                        decaf::util::Properties* properties )
+ :  ActiveMQConnectionSupport( transport, properties ){
+
     this->started = false;
     this->closed = false;
     this->exceptionListener = NULL;
     this->connectionMetaData.reset( new ActiveMQConnectionMetaData() );
 
     // Register for messages and exceptions from the connector.
-    Connector* connector = connectionData->getConnector();
-    connector->setConsumerMessageListener( this );
-    connector->setExceptionListener( this );
+    transport->setCommandListener( this );
+    transport->setTransportExceptionListener( this );
+
+    // Now Start the Transport
+    transport->start();
+
+    // Attempt to register with the Broker
+    this->connect();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 ActiveMQConnection::~ActiveMQConnection() {
     try {
-        close();
+        this->close();
     }
     AMQ_CATCH_NOTHROW( ActiveMQException )
     AMQ_CATCHALL_NOTHROW( )
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::addDispatcher( connector::ConsumerInfo* consumer,
+void ActiveMQConnection::addDispatcher( commands::ConsumerInfo* consumer,
     Dispatcher* dispatcher ) {
 
     // Add the consumer to the map.
     synchronized( &dispatchers ) {
-        dispatchers.setValue( consumer->getConsumerId(), dispatcher );
+        dispatchers.setValue( consumer->getConsumerId()->getValue(), dispatcher );
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::removeDispatcher( const connector::ConsumerInfo* consumer ) {
+void ActiveMQConnection::removeDispatcher( const commands::ConsumerInfo* consumer ) {
 
     // Remove the consumer from the map.
     synchronized( &dispatchers ) {
-        dispatchers.remove( consumer->getConsumerId() );
+        dispatchers.remove( consumer->getConsumerId()->getValue() );
     }
 }
 
@@ -91,11 +123,21 @@
 
     try {
 
+        enforceConnected();
+
+        // Create and initialize a new SessionInfo object
+        std::auto_ptr<commands::SessionInfo> sessionInfo( new commands::SessionInfo() );
+        std::auto_ptr<commands::SessionId> sessionId( new commands::SessionId() );
+        sessionId->setConnectionId( connectionInfo.getConnectionId()->getValue() );
+        sessionId->setValue( this->getNextSessionId() );
+        sessionInfo->setSessionId( sessionId.release() );
+
+        // Send the subscription message to the broker.
+        syncRequest( sessionInfo.get() );
+
         // Create the session instance.
         ActiveMQSession* session = new ActiveMQSession(
-            connectionData->getConnector()->createSession( ackMode ),
-            connectionData->getProperties(),
-            this );
+            sessionInfo.release(), ackMode, this->getProperties(), this );
 
         // Add the session to the set of active sessions.
         synchronized( &activeSessions ) {
@@ -115,13 +157,63 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::removeSession( ActiveMQSession* session )
+    throw ( cms::CMSException ) {
+
+    try {
+
+        // Remove this session from the set of active sessions.
+        synchronized( &activeSessions ) {
+            activeSessions.remove( session );
+        }
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::addProducer( ActiveMQProducer* producer )
+    throw ( cms::CMSException ) {
+
+    try {
+
+        // Add this producer from the set of active consumer.
+        synchronized( &activeProducers ) {
+            activeProducers.setValue(
+                producer->getProducerInfo()->getProducerId()->getValue(), producer );
+        }
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::removeProducer( ActiveMQProducer* producer )
+    throw ( cms::CMSException ) {
+
+    try {
+
+        // Remove this producer from the set of active consumer.
+        synchronized( &activeProducers ) {
+            activeProducers.remove(
+                producer->getProducerInfo()->getProducerId()->getValue() );
+        }
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
 std::string ActiveMQConnection::getClientID() const {
 
-    if( closed ) {
+    if( this->isClosed() ) {
         return "";
     }
 
-    return connectionData->getConnector()->getClientId();
+    return this->getClientId();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -129,7 +221,7 @@
 {
     try {
 
-        if( closed ) {
+        if( this->isClosed() ) {
             return;
         }
 
@@ -149,17 +241,13 @@
             }
         }
 
+        // Now inform the Broker we are shutting down.
+        this->disconnect();
+
         // Once current deliveries are done this stops the delivery
         // of any new messages.
-        started = false;
-        closed = true;
-
-        // Destroy the connection data.  This will close the connector
-        // and transports.
-        if( connectionData != NULL ){
-            delete connectionData;
-            connectionData = NULL;
-        }
+        this->started = false;
+        this->closed = true;
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -170,6 +258,8 @@
 void ActiveMQConnection::start() throw ( cms::CMSException ) {
     try{
 
+        enforceConnected();
+
         // This starts or restarts the delivery of all incomming messages
         // messages delivered while this connection is stopped are dropped
         // and not acknowledged.
@@ -191,15 +281,17 @@
 
     try {
 
+        enforceConnected();
+
         // Once current deliveries are done this stops the delivery of any
         // new messages.
         started = false;
 
-        Iterator<ActiveMQSession*>* iter = activeSessions.iterator();
+        std::auto_ptr< Iterator<ActiveMQSession*> > iter( activeSessions.iterator() );
+
         while( iter->hasNext() ){
             iter->next()->stop();
         }
-        delete iter;
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -207,69 +299,52 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::onConsumerMessage( connector::ConsumerInfo* consumer,
-                                            core::ActiveMQMessage* message ) {
-    try {
+void ActiveMQConnection::connect() throw ( activemq::exceptions::ActiveMQException ) {
 
-        if( connectionData == NULL) {
-            ActiveMQException ex(
-                __FILE__, __LINE__,
-                "ActiveMQConnection::onConsumerMessage - "
-                "Connection Data Null, could be closed." );
+    try{
 
-            fire( ex );
+        // Start the Transport
+        this->startupTransport();
 
-            return;
-        }
+        // Fill in our connection info.
+        connectionInfo.setUserName( this->getUsername() );
+        connectionInfo.setPassword( this->getPassword() );
+
+        // Get or Create a Client Id
+        string clientId = this->getClientId();
+        if( clientId.length() > 0 ){
+            connectionInfo.setClientId( clientId );
+        } else {
+            connectionInfo.setClientId( UUID::randomUUID().toString() );
+        }
+
+        // Generate a connectionId
+        commands::ConnectionId* connectionId = new commands::ConnectionId();
+        connectionId->setValue( UUID::randomUUID().toString() );
+        connectionInfo.setConnectionId( connectionId );
 
-        // Look up the dispatcher.
-        Dispatcher* dispatcher = NULL;
-        synchronized( &dispatchers ) {
-
-            dispatcher = dispatchers.getValue(consumer->getConsumerId());
-
-            // If we have no registered dispatcher, the consumer was probably
-            // just closed.  Just delete the message.
-            if( dispatcher == NULL ) {
-                delete message;
-            } else {
-
-                // Dispatch the message.
-                DispatchData data( consumer, message );
-                dispatcher->dispatch( data );
-            }
-        }
-    }
-    catch( exceptions::ActiveMQException& ex ) {
-        ex.setMark( __FILE__, __LINE__ );
-        fire( ex );
-    }
-    catch( ... ) {
-        exceptions::ActiveMQException ex(
-           __FILE__, __LINE__,
-           "IOTransport::run - caught unknown exception" );
-        fire( ex );
+        // Now we ping the broker and see if we get an ack / nack
+        syncRequest( &connectionInfo );
     }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::onException( const CMSException& ex ){
+void ActiveMQConnection::disconnect() throw ( activemq::exceptions::ActiveMQException ) {
 
-    if( exceptionListener != NULL ){
-        exceptionListener->onException( ex );
-    }
-}
+    try{
 
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::removeSession( ActiveMQSession* session )
-    throw ( cms::CMSException ) {
+        // Remove our ConnectionId from the Broker
+        disposeOf( connectionInfo.getConnectionId(), this->getCloseTimeout() );
 
-    try {
+        // Send the disconnect command to the broker.
+        commands::ShutdownInfo shutdown;
+        oneway( &shutdown );
 
-        // Remove this session from the set of active sessions.
-        synchronized( &activeSessions ) {
-            activeSessions.remove( session );
-        }
+        // Allow the Support class to shutdown its resources, including the Transport.
+        this->shutdownTransport();
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -277,16 +352,20 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::sendPullRequest( const connector::ConsumerInfo* consumer, long long timeout )
-    throw ( ActiveMQException ) {
+void ActiveMQConnection::sendPullRequest(
+    const commands::ConsumerInfo* consumer, long long timeout ) throw ( ActiveMQException ) {
 
     try {
 
-        if( !this->connectionData->getConnector()->isMessagePullSupported() ) {
-            return;
-        }
+         if( consumer->getPrefetchSize() == 0 ) {
 
-        this->connectionData->getConnector()->pullMessage( consumer, timeout );
+             commands::MessagePull messagePull;
+             messagePull.setConsumerId( consumer->getConsumerId()->cloneDataStructure() );
+             messagePull.setDestination( consumer->getDestination()->cloneDataStructure() );
+             messagePull.setTimeout( timeout );
+
+             this->oneway( &messagePull );
+         }
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -307,13 +386,19 @@
                 __FILE__, __LINE__, "Destination passed was NULL" );
         }
 
-        if( this->isClosed() ) {
-            throw IllegalStateException(
-                __FILE__, __LINE__, "Connection Closed" );
-        }
+        enforceConnected();
 
-        // Ask the connector to perform a remove.
-        this->connectionData->getConnector()->destroyDestination( destination );
+        const commands::ActiveMQDestination* amqDestination =
+            dynamic_cast<const commands::ActiveMQDestination*>( destination );
+
+        commands::DestinationInfo command;
+
+        command.setConnectionId( connectionInfo.getConnectionId()->cloneDataStructure() );
+        command.setOperationType( ActiveMQConstants::DESTINATION_REMOVE_OPERATION );
+        command.setDestination( amqDestination->cloneDataStructure() );
+
+        // Send the message to the broker.
+        syncRequest( &command );
     }
     AMQ_CATCH_RETHROW( NullPointerException )
     AMQ_CATCH_RETHROW( IllegalStateException )
@@ -321,3 +406,245 @@
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
     AMQ_CATCHALL_THROW( ActiveMQException )
 }
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::onCommand( transport::Command* command ) {
+
+    try{
+
+        if( typeid( *command ) == typeid( commands::MessageDispatch ) ) {
+
+            commands::MessageDispatch* dispatch =
+                dynamic_cast<commands::MessageDispatch*>( command );
+
+            // Due to the severe suckiness of C++, in order to cast to
+            // a type that is in a different branch of the inheritance hierarchy
+            // we have to cast to the type at the "crotch" of the branch and then
+            // we can implicitly cast up the other branch.
+            core::ActiveMQMessage* message =
+                dynamic_cast<core::ActiveMQMessage*>( dispatch->getMessage() );
+            if( message == NULL ) {
+                delete command;
+                throw ActiveMQException(
+                    __FILE__, __LINE__,
+                    "ActiveMQConnection::onCommand - "
+                    "Received unsupported dispatch message" );
+            }
+
+            // Look up the dispatcher.
+            Dispatcher* dispatcher = NULL;
+            synchronized( &dispatchers ) {
+
+                dispatcher = dispatchers.getValue( dispatch->getConsumerId()->getValue() );
+
+                // If we have no registered dispatcher, the consumer was probably
+                // just closed.  Just delete the message.
+                if( dispatcher == NULL ) {
+                    delete message;
+                } else {
+
+                    // Dispatch the message.
+                    DispatchData data( dispatch->getConsumerId(), message );
+                    dispatcher->dispatch( data );
+                }
+            }
+
+            // Clear the Message as we've passed it onto the
+            // listener, who is responsible for deleting it at
+            // the appropriate time, which depends on things like
+            // the session being transacted etc.
+            dispatch->setMessage( NULL );
+            dispatch->setConsumerId( NULL );
+
+            delete command;
+
+        } else if( typeid( *command ) == typeid( commands::ProducerAck ) ) {
+
+            commands::ProducerAck* producerAck =
+                dynamic_cast<commands::ProducerAck*>( command );
+
+            // Get the consumer info object for this consumer.
+            ActiveMQProducer* producer = NULL;
+            synchronized( &this->activeProducers ) {
+                producer = this->activeProducers.getValue( producerAck->getProducerId()->getValue() );
+                if( producer != NULL ){
+                    producer->onProducerAck( *producerAck );
+                }
+            }
+
+            delete command;
+
+        } else if( typeid( *command ) == typeid( commands::WireFormatInfo ) ) {
+            this->brokerWireFormatInfo.reset(
+                dynamic_cast<commands::WireFormatInfo*>( command ) );
+        } else if( typeid( *command ) == typeid( commands::BrokerInfo ) ) {
+            this->brokerInfo.reset(
+                dynamic_cast<commands::BrokerInfo*>( command ) );
+        } else if( typeid( *command ) == typeid( commands::KeepAliveInfo ) ) {
+
+            if( command->isResponseRequired() ) {
+                command->setResponseRequired( false );
+
+                oneway( command );
+            }
+
+            delete command;
+
+        } else if( typeid( *command ) == typeid( commands::ShutdownInfo ) ) {
+
+            try {
+                if( !this->isClosed() ) {
+                    fire( ActiveMQException(
+                        __FILE__, __LINE__,
+                        "ActiveMQConnection::onCommand - "
+                        "Broker closed this connection."));
+                }
+            } catch( ... ) { /* do nothing */ }
+
+            delete command;
+
+        } else {
+            //LOGDECAF_WARN( logger, "Received an unknown command" );
+            delete command;
+        }
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::onTransportException( transport::Transport* source AMQCPP_UNUSED,
+                                               const decaf::lang::Exception& ex ) {
+
+    try {
+
+        // We're disconnected - the asynchronous error is expected.
+        if( this->isClosed() ){
+            return;
+        }
+
+        // Inform the user of the error.
+        fire( exceptions::ActiveMQException( ex ) );
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::oneway( transport::Command* command )
+    throw ( ActiveMQException ) {
+
+    try {
+        enforceConnected();
+        this->getTransport().oneway( command );
+    }
+    AMQ_CATCH_EXCEPTION_CONVERT( transport::CommandIOException, ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( UnsupportedOperationException, ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::syncRequest( transport::Command* command, unsigned int timeout )
+    throw ( ActiveMQException ) {
+
+    try {
+
+        enforceConnected();
+
+        std::auto_ptr<transport::Response> response;
+
+        if( timeout == 0 ) {
+            response.reset( this->getTransport().request( command ) );
+        } else {
+            response.reset( this->getTransport().request( command, timeout ) );
+        }
+
+        commands::ExceptionResponse* exceptionResponse =
+            dynamic_cast<commands::ExceptionResponse*>( response.get() );
+
+        if( exceptionResponse != NULL ) {
+
+            // Create an exception to hold the error information.
+            commands::BrokerError* brokerError =
+                dynamic_cast<commands::BrokerError*>(
+                        exceptionResponse->getException() );
+            BrokerException exception( __FILE__, __LINE__, brokerError );
+
+            // Throw the exception.
+            throw exception;
+        }
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( transport::CommandIOException, ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( UnsupportedOperationException, ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::disposeOf( commands::DataStructure* objectId )
+    throw ( ActiveMQException ) {
+
+    try{
+        commands::RemoveInfo command;
+        command.setObjectId( objectId->cloneDataStructure() );
+        oneway( &command );
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::disposeOf( commands::DataStructure* objectId,
+                                    unsigned int timeout )
+    throw ( ActiveMQException ) {
+
+    try{
+        commands::RemoveInfo command;
+        command.setObjectId( objectId->cloneDataStructure() );
+        this->syncRequest( &command, timeout );
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::enforceConnected() const throw ( ActiveMQException ) {
+    if( this->isClosed() ) {
+        throw ActiveMQException(
+            __FILE__, __LINE__,
+            "ActiveMQConnection::enforceConnected - Not Connected!" );
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::fire( const exceptions::ActiveMQException& ex ) {
+    if( exceptionListener != NULL ) {
+        try {
+            exceptionListener->onException( ex );
+        }
+        catch(...){}
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+const commands::ConnectionInfo* ActiveMQConnection::getConnectionInfo() const
+    throw( exceptions::ActiveMQException ) {
+
+    enforceConnected();
+
+    return &this->connectionInfo;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+const commands::ConnectionId* ActiveMQConnection::getConnectionId() const
+    throw( exceptions::ActiveMQException ) {
+
+    enforceConnected();
+
+    return this->connectionInfo.getConnectionId();
+}

Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.h?rev=736842&r1=736841&r2=736842&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.h Thu Jan 22 14:55:27 2009
@@ -19,13 +19,17 @@
 #define _ACTIVEMQ_CORE_ACTIVEMQCONNECTION_H_
 
 #include <cms/Connection.h>
-#include <cms/ExceptionListener.h>
 #include <activemq/util/Config.h>
-#include <activemq/core/ActiveMQConnectionData.h>
+#include <activemq/core/ActiveMQConnectionSupport.h>
 #include <activemq/core/ActiveMQConnectionMetaData.h>
 #include <activemq/core/ActiveMQMessage.h>
 #include <activemq/core/Dispatcher.h>
-#include <activemq/connector/ConsumerMessageListener.h>
+#include <activemq/commands/ActiveMQTempDestination.h>
+#include <activemq/commands/BrokerInfo.h>
+#include <activemq/commands/ConnectionInfo.h>
+#include <activemq/commands/ConsumerInfo.h>
+#include <activemq/commands/LocalTransactionId.h>
+#include <activemq/commands/WireFormatInfo.h>
 #include <activemq/exceptions/ActiveMQException.h>
 #include <decaf/util/Properties.h>
 #include <decaf/util/Map.h>
@@ -38,36 +42,24 @@
 #include <memory>
 
 namespace activemq{
-
-    namespace connector {
-        class ConsumerInfo;
-    }
-
 namespace core{
 
     class ActiveMQSession;
-    class ActiveMQConsumer;
+    class ActiveMQProducer;
 
     /**
      * Concrete connection used for all connectors to the
      * ActiveMQ broker.
      */
-    class AMQCPP_API ActiveMQConnection :
-        public cms::Connection,
-        public connector::ConsumerMessageListener,
-        public cms::ExceptionListener
+    class AMQCPP_API ActiveMQConnection : public cms::Connection,
+                                          public ActiveMQConnectionSupport
     {
     private:
 
         /**
-         * the registered exception listener
-         */
-        cms::ExceptionListener* exceptionListener;
-
-        /**
-         * All the data that is used to connect this Connection
+         * Sync object.
          */
-        ActiveMQConnectionData* connectionData;
+        decaf::util::concurrent::Mutex mutex;
 
         /**
          * The instance of ConnectionMetaData to return to clients.
@@ -91,17 +83,47 @@
         decaf::util::Map< long long, Dispatcher* > dispatchers;
 
         /**
+         * Map of message dispatchers indexed by consumer id.
+         */
+        decaf::util::Map< long long, ActiveMQProducer* > activeProducers;
+
+        /**
          * Maintain the set of all active sessions.
          */
         decaf::util::Set<ActiveMQSession*> activeSessions;
 
+        /**
+         * Connection Information for this connection to the Broker
+         */
+        commands::ConnectionInfo connectionInfo;
+
+        /**
+         * the registered exception listener
+         */
+        cms::ExceptionListener* exceptionListener;
+
+        /**
+         * Command sent from the Broker with its BrokerInfo
+         */
+        std::auto_ptr<commands::BrokerInfo> brokerInfo;
+
+        /**
+         * Command sent from the Broker with its WireFormatInfo
+         */
+        std::auto_ptr<commands::WireFormatInfo> brokerWireFormatInfo;
+
     public:
 
         /**
          * Constructor
-         * @param Pointer to an ActiveMQConnectionData object, owned here
+         *
+         * @param transprot
+         *        The Transport requested for this connection to the Broker.
+         * @param properties
+         *        The Properties that were defined for this connection
          */
-        ActiveMQConnection( ActiveMQConnectionData* connectionData );
+        ActiveMQConnection( transport::Transport* transport,
+                            decaf::util::Properties* properties );
 
         virtual ~ActiveMQConnection();
 
@@ -113,17 +135,29 @@
         virtual void removeSession( ActiveMQSession* session ) throw ( cms::CMSException );
 
         /**
+         * Adds an active consumer to the Set of known consumers
+         * @param consumer - The consumer to add to the the known set.
+         */
+        virtual void addProducer( ActiveMQProducer* producer ) throw ( cms::CMSException );
+
+        /**
+         * Removes an active consumer to the Set of known consumers
+         * @param consumer - The consumer to remove from the the known set.
+         */
+        virtual void removeProducer( ActiveMQProducer* producer ) throw ( cms::CMSException );
+
+        /**
          * Adds a dispatcher for a consumer.
          * @param consumer - The consumer for which to register a dispatcher.
          * @param dispatcher - The dispatcher to handle incoming messages for the consumer.
          */
-        virtual void addDispatcher( connector::ConsumerInfo* consumer, Dispatcher* dispatcher );
+        virtual void addDispatcher( commands::ConsumerInfo* consumer, Dispatcher* dispatcher );
 
         /**
          * Removes the dispatcher for a consumer.
          * @param consumer - The consumer for which to remove the dispatcher.
          */
-        virtual void removeDispatcher( const connector::ConsumerInfo* consumer );
+        virtual void removeDispatcher( const commands::ConsumerInfo* consumer );
 
         /**
          * If supported sends a message pull request to the service provider asking
@@ -133,7 +167,7 @@
          * @param consumer - the ConsumerInfo for the requesting Consumer.
          * @param timeout - the time that the client is willing to wait.
          */
-        virtual void sendPullRequest( const connector::ConsumerInfo* consumer, long long timeout )
+        virtual void sendPullRequest( const commands::ConsumerInfo* consumer, long long timeout )
             throw ( exceptions::ActiveMQException );
 
         /**
@@ -207,12 +241,24 @@
         virtual std::string getClientID() const;
 
         /**
-         * Retrieves the Connection Data object for this object.
-         * @return pointer to a connection data object.
+         * Closes this connection as well as any Sessions
+         * created from it (and those Sessions' consumers and
+         * producers).
+         * @throws CMSException
          */
-        virtual ActiveMQConnectionData* getConnectionData(){
-            return connectionData;
-        }
+        virtual void close() throw ( cms::CMSException );
+
+        /**
+         * Starts or (restarts) a connections delivery of incoming messages
+         * @throws CMSException
+         */
+        virtual void start() throw ( cms::CMSException );
+
+        /**
+         * Stop the flow of incoming messages
+         * @throws CMSException
+         */
+        virtual void stop() throw ( cms::CMSException );
 
         /**
          * Gets the registered Exception Listener for this connection
@@ -229,64 +275,98 @@
             exceptionListener = listener;
         };
 
+    public: // transport::CommandListener
+
         /**
-         * Closes this connection as well as any Sessions
-         * created from it (and those Sessions' consumers and
-         * producers).
-         * @throws CMSException
+         * Event handler for the receipt of a non-response command from the
+         * transport.
+         * @param command the received command object.
          */
-        virtual void close() throw ( cms::CMSException );
+        virtual void onCommand( transport::Command* command );
+
+    public: // TransportExceptionListener
 
         /**
-         * Starts or (restarts) a connections delivery of incoming messages
-         * @throws CMSException
+         * Event handler for an exception from a command transport.
+         * @param source The source of the exception
+         * @param ex The exception.
          */
-        virtual void start() throw ( cms::CMSException );
+        virtual void onTransportException( transport::Transport* source,
+                                           const decaf::lang::Exception& ex );
+
+    public:
 
         /**
-         * Stop the flow of incoming messages
-         * @throws CMSException
+         * Gets the ConnectionInfo for this Object, if the Connection is not open
+         * than this method throws an exception
          */
-        virtual void stop() throw ( cms::CMSException );
+        const commands::ConnectionInfo* getConnectionInfo() const
+            throw( exceptions::ActiveMQException );
 
-    public:     // ExceptionListener interface methods
+        /**
+         * Gets the ConnectionId for this Object, if the Connection is not open
+         * than this method throws an exception
+         */
+        const commands::ConnectionId* getConnectionId() const
+            throw( exceptions::ActiveMQException );
 
         /**
-         * Called when an exception occurs.  Once notified of an exception
-         * the caller should no longer use the resource that generated the
-         * exception.
-         * @param Exception Object that occurred.
+         * Sends a oneway message.
+         * @param command The message to send.
+         * @throws ConnectorException if not currently connected, or
+         * if the operation fails for any reason.
          */
-        virtual void onException( const cms::CMSException& ex );
+        void oneway( transport::Command* command )
+            throw ( activemq::exceptions::ActiveMQException );
 
-    public:     // ConsumerMessageListener interface methods
+        /**
+         * Sends a synchronous request and returns the response from the broker.
+         * Converts any error responses into an exception.
+         * @param command The request command.
+         * @param timeout The time to wait for a response, default is zero or infinite.
+         * @throws ConnectorException thrown if an error response was received
+         * from the broker, or if any other error occurred.
+         */
+        void syncRequest( transport::Command* command, unsigned int timeout = 0 )
+            throw ( activemq::exceptions::ActiveMQException );
 
         /**
-         * Called to dispatch a message to a particular consumer.
-         * @param consumer the target consumer of the dispatch.
-         * @param message the message to be dispatched.
-         * @param own If true, it is the responsibility of the callee
-         * to destroy the message object.  Otherwise, the callee must NOT
-         * destroy it.
-         *
+         * Sends a message to the broker to dispose of the given resource
+         * using an async oneway call.
+         * @param objectId The ID of the resource to be released.
+         * @throw ConnectorException if any problems occur from sending
+         * the message.
          */
-        virtual void onConsumerMessage( connector::ConsumerInfo* consumer,
-                                        core::ActiveMQMessage* message );
+        void disposeOf( commands::DataStructure* objectId )
+            throw ( activemq::exceptions::ActiveMQException );
 
-    public:
+        /**
+         * Sends a message to the broker to dispose of the given resource
+         * using a timed request.
+         * @param objectId The ID of the resource to be released.
+         * @param timeout The time to wait for a response that the object is disposed.
+         * @throw ConnectorException if any problems occur from sending
+         * the message.
+         */
+        void disposeOf( commands::DataStructure* objectId, unsigned int timeout )
+            throw ( activemq::exceptions::ActiveMQException );
 
         /**
          * Notify the exception listener
          * @param ex the exception to fire
          */
-        void fire( exceptions::ActiveMQException& ex ) {
-            if( exceptionListener != NULL ) {
-                try {
-                    exceptionListener->onException( ex );
-                }
-                catch(...){}
-            }
-        }
+        virtual void fire( const exceptions::ActiveMQException& ex );
+
+    private:
+
+        // Sends the connect message to the broker and waits for the response.
+        void connect() throw ( activemq::exceptions::ActiveMQException );
+
+        // Sends a oneway disconnect message to the broker.
+        void disconnect() throw ( activemq::exceptions::ActiveMQException );
+
+        // Check for Connected State and Throw an exception if not.
+        void enforceConnected() const throw ( activemq::exceptions::ActiveMQException );
 
     };
 

Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnectionFactory.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnectionFactory.cpp?rev=736842&r1=736841&r2=736842&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnectionFactory.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnectionFactory.cpp Thu Jan 22 14:55:27 2009
@@ -21,7 +21,6 @@
 #include <decaf/util/Properties.h>
 #include <decaf/lang/exceptions/NullPointerException.h>
 #include <activemq/exceptions/ExceptionDefines.h>
-#include <activemq/connector/ConnectorFactoryMap.h>
 #include <activemq/transport/TransportRegistry.h>
 #include <activemq/core/ActiveMQConnection.h>
 #include <activemq/core/ActiveMQConstants.h>
@@ -31,7 +30,6 @@
 using namespace std;
 using namespace activemq;
 using namespace activemq::core;
-using namespace activemq::connector;
 using namespace activemq::exceptions;
 using namespace activemq::transport;
 using namespace decaf;
@@ -102,8 +100,6 @@
     // Declared here so that they can be deleted in the catch block
     auto_ptr<Properties> properties( new Properties() );
     auto_ptr<Transport> transport;
-    auto_ptr<Connector> connector;
-    auto_ptr<ActiveMQConnectionData> connectionData;
     auto_ptr<ActiveMQConnection> connection;
     std::string clientIdLocal = clientId;
 
@@ -142,40 +138,9 @@
                 "failed creating new Transport" );
         }
 
-        // What wire format are we using, defaults to Stomp
-        std::string wireFormat =
-            properties->getProperty( "wireFormat", "openwire" );
-
-        // Now try and find a factory to create the Connector
-        ConnectorFactory* connectorfactory =
-            ConnectorFactoryMap::getInstance()->lookup( wireFormat );
-
-        if( connectorfactory == NULL ) {
-            throw ActiveMQException(
-                __FILE__, __LINE__,
-                "ActiveMQConnectionFactory::createConnection - "
-                "Connector for Wire Format not registered in Map" );
-        }
-
-        // Create the Connector.
-        connector.reset( connectorfactory->createConnector( *properties, transport.get() ) );
-
-        if( connector.get() == NULL ) {
-            throw ActiveMQException(
-                __FILE__, __LINE__,
-                "ActiveMQConnectionFactory::createConnection - "
-                "Failed to Create the Connector" );
-        }
-
-        // Start the Connector
-        connector->start();
-
-        // Create Holder and store the data for the Connection
-        connectionData.reset( new ActiveMQConnectionData(
-            connector.release(), transport.release(), properties.release() ) );
-
         // Create and Return the new connection object.
-        connection.reset( new ActiveMQConnection( connectionData.release() ) );
+        connection.reset(
+            new ActiveMQConnection( transport.release(), properties.release() ) );
 
         return connection.release();
     }

Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnectionSupport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnectionSupport.cpp?rev=736842&r1=736841&r2=736842&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnectionSupport.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnectionSupport.cpp Thu Jan 22 14:55:27 2009
@@ -34,7 +34,7 @@
 
 ////////////////////////////////////////////////////////////////////////////////
 ActiveMQConnectionSupport::ActiveMQConnectionSupport( transport::Transport* transport,
-                                                      const decaf::util::Properties& properties ) {
+                                                      decaf::util::Properties* properties ) {
 
     if( transport  == NULL ) {
         throw decaf::lang::exceptions::IllegalArgumentException(
@@ -43,47 +43,44 @@
             "Required Parameter 'transport' was NULL.");
     }
 
-    this->properties = properties;
-    this->transport = transport;
-
-    // Start the Transport
-    this->transport->start();
+    this->properties.reset( properties );
+    this->transport.reset( transport );
 
     // Check the connection options
     this->setAlwaysSyncSend( Boolean::parseBoolean(
-        properties.getProperty(
+        properties->getProperty(
             core::ActiveMQConstants::toString(
                 core::ActiveMQConstants::CONNECTION_ALWAYSSYNCSEND ), "false" ) ) );
 
     this->setUseAsyncSend( Boolean::parseBoolean(
-        properties.getProperty(
+        properties->getProperty(
             core::ActiveMQConstants::toString(
                 core::ActiveMQConstants::CONNECTION_USEASYNCSEND ), "false" ) ) );
 
     this->setProducerWindowSize( decaf::lang::Integer::parseInt(
-        properties.getProperty(
+        properties->getProperty(
             core::ActiveMQConstants::toString(
                 core::ActiveMQConstants::CONNECTION_PRODUCERWINDOWSIZE ), "0" ) ) );
 
     this->setSendTimeout( decaf::lang::Integer::parseInt(
-        properties.getProperty(
+        properties->getProperty(
             core::ActiveMQConstants::toString(
                 core::ActiveMQConstants::CONNECTION_SENDTIMEOUT ), "0" ) ) );
 
     this->setCloseTimeout( decaf::lang::Integer::parseInt(
-        properties.getProperty(
+        properties->getProperty(
             core::ActiveMQConstants::toString(
                 core::ActiveMQConstants::CONNECTION_CLOSETIMEOUT ), "15000" ) ) );
 
-    this->setClientId( properties.getProperty(
+    this->setClientId( properties->getProperty(
         core::ActiveMQConstants::toString(
             core::ActiveMQConstants::PARAM_CLIENTID ), "" ) );
 
-    this->setUsername( properties.getProperty(
+    this->setUsername( properties->getProperty(
         core::ActiveMQConstants::toString(
             core::ActiveMQConstants::PARAM_USERNAME ), "" ) );
 
-    this->setPassword( properties.getProperty(
+    this->setPassword( properties->getProperty(
         core::ActiveMQConstants::toString(
             core::ActiveMQConstants::PARAM_PASSWORD ), "" ) );
 }
@@ -91,21 +88,38 @@
 ////////////////////////////////////////////////////////////////////////////////
 ActiveMQConnectionSupport::~ActiveMQConnectionSupport() {
     try{
-        this->close();
+        this->shutdownTransport();
     }
     AMQ_CATCH_NOTHROW( ActiveMQException )
     AMQ_CATCHALL_NOTHROW()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnectionSupport::close() throw( decaf::lang::Exception ) {
+void ActiveMQConnectionSupport::startupTransport() throw( decaf::lang::Exception ) {
+    try {
+
+        if( this->transport.get()  == NULL ) {
+            throw decaf::lang::exceptions::IllegalArgumentException(
+                __FILE__, __LINE__,
+                "ActiveMQConnectionSupport::startupTransport - "
+                "Required Object 'transport' was NULL.");
+        }
+
+        this->transport->start();
+    }
+    AMQ_CATCH_RETHROW( decaf::lang::Exception )
+    AMQ_CATCHALL_THROW( decaf::lang::Exception )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnectionSupport::shutdownTransport() throw( decaf::lang::Exception ) {
 
     bool hasException = false;
     exceptions::ActiveMQException e;
 
     try {
 
-        if( transport != NULL ){
+        if( transport.get() != NULL ){
 
             try{
                 transport->close();
@@ -118,7 +132,7 @@
             }
 
             try{
-                delete transport;
+                transport.reset( NULL );
             }catch( exceptions::ActiveMQException& ex ){
                 if( !hasException ){
                     hasException = true;
@@ -126,8 +140,6 @@
                     e = ex;
                 }
             }
-
-            transport = NULL;
         }
 
         // If we encountered an exception - throw the first one we encountered.
@@ -136,6 +148,6 @@
             throw e;
         }
     }
-    AMQ_CATCH_NOTHROW( exceptions::ActiveMQException )
-    AMQ_CATCHALL_NOTHROW( )
+    AMQ_CATCH_RETHROW( decaf::lang::Exception )
+    AMQ_CATCHALL_THROW( decaf::lang::Exception )
 }

Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnectionSupport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnectionSupport.h?rev=736842&r1=736841&r2=736842&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnectionSupport.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnectionSupport.h Thu Jan 22 14:55:27 2009
@@ -18,28 +18,34 @@
 #ifndef _ACTIVEMQ_CORE_ACTIVEMQCONNECTIONSUPPORT_H_
 #define _ACTIVEMQ_CORE_ACTIVEMQCONNECTIONSUPPORT_H_
 
+#include <cms/ExceptionListener.h>
+
 #include <activemq/util/Config.h>
+#include <activemq/exceptions/ActiveMQException.h>
+#include <activemq/transport/Transport.h>
+#include <activemq/transport/TransportExceptionListener.h>
+#include <activemq/transport/CommandListener.h>
+#include <activemq/util/LongSequenceGenerator.h>
 
-#include <decaf/io/Closeable.h>
 #include <decaf/util/Properties.h>
 #include <decaf/lang/Exception.h>
 
-#include <activemq/transport/Transport.h>
-#include <activemq/util/LongSequenceGenerator.h>
-
 #include <memory>
 
 namespace activemq {
 namespace core {
 
-    class AMQCPP_API ActiveMQConnectionSupport : public decaf::io::Closeable {
+    class AMQCPP_API ActiveMQConnectionSupport :
+        public transport::CommandListener,
+        public transport::TransportExceptionListener
+    {
     private:
 
         // Properties used to configure this connection.
-        decaf::util::Properties properties;
+        std::auto_ptr<decaf::util::Properties> properties;
 
         // Transport we are using
-        transport::Transport* transport;
+        std::auto_ptr<transport::Transport> transport;
 
         /**
          * Boolean indicating that we are to always send message Synchronously.
@@ -129,16 +135,30 @@
          *        The URI configured properties for this connection.
          */
         ActiveMQConnectionSupport( transport::Transport* transport,
-                                  const decaf::util::Properties& properties );
+                                   decaf::util::Properties* properties );
 
         virtual ~ActiveMQConnectionSupport();
 
         /**
+         * Starts the Transport, this should initiate the connection between
+         * this client and the Transports endpoint.
+         * @throws Exception
+         */
+        virtual void startupTransport() throw( decaf::lang::Exception );
+
+        /**
+         * Closes this object and deallocates the appropriate resources.
+         * The object is generally no longer usable after calling close.
+         * @throws Exception
+         */
+        virtual void shutdownTransport() throw( decaf::lang::Exception );
+
+        /**
          * Gets the Properties object that this Config object was initialized with.
          * @returns a const reference to the Connection Config.
          */
         const decaf::util::Properties& getProperties() const {
-            return this->properties;
+            return *( this->properties.get() );
         }
 
         /**
@@ -146,7 +166,7 @@
          * @return the configured transport
          */
         transport::Transport& getTransport() const {
-            return *( this->transport );
+            return *( this->transport.get() );
         }
 
         /**
@@ -332,15 +352,6 @@
             return this->tempDestinationIds.getNextSequenceId();
         }
 
-    public:  // decaf::io::Closeable
-
-        /**
-         * Closes this object and deallocates the appropriate resources.
-         * The object is generally no longer usable after calling close.
-         * @throws CMSException
-         */
-        virtual void close() throw( decaf::lang::Exception );
-
     };
 
 }}

Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConstants.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConstants.h?rev=736842&r1=736841&r2=736842&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConstants.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConstants.h Thu Jan 22 14:55:27 2009
@@ -32,6 +32,33 @@
     class AMQCPP_API ActiveMQConstants{
     public:
 
+        // Flags to indicate Transaction States.
+        enum TransactionState {
+            TRANSACTION_STATE_BEGIN = 0,
+            TRANSACTION_STATE_PREPARE = 1,
+            TRANSACTION_STATE_COMMITONEPHASE = 2,
+            TRANSACTION_STATE_COMMITTWOPHASE = 3,
+            TRANSACTION_STATE_ROLLBACK = 4,
+            TRANSACTION_STATE_RECOVER = 5,
+            TRANSACTION_STATE_FORGET = 6,
+            TRANSACTION_STATE_END = 7
+        };
+
+        // Flags to be applied when sending the Destination Info Command.
+        enum DestinationActions {
+            DESTINATION_ADD_OPERATION = 0,
+            DESTINATION_REMOVE_OPERATION = 1
+        };
+
+        // Represents the Acknowledgement types that are supported for the
+        // Message Ack Command.
+        enum AckType {
+            ACK_TYPE_DELIVERED = 0,  // Message delivered but not consumed
+            ACK_TYPE_POISON    = 1,  // Message could not be processed due to
+                                     // poison pill but discard anyway
+            ACK_TYPE_CONSUMED  = 2   // Message consumed, discard
+        };
+
         /**
          * These values represent the options that can be appended to an
          * Destination name, i.e. /topic/foo?consumer.exclusive=true

Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.cpp?rev=736842&r1=736841&r2=736842&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.cpp Thu Jan 22 14:55:27 2009
@@ -22,9 +22,15 @@
 #include <decaf/util/Date.h>
 #include <activemq/util/Config.h>
 #include <activemq/exceptions/ActiveMQException.h>
+#include <activemq/commands/Message.h>
+#include <activemq/commands/MessageAck.h>
+#include <activemq/commands/MessagePull.h>
+#include <activemq/commands/TransactionInfo.h>
+#include <activemq/commands/TransactionId.h>
 #include <activemq/core/ActiveMQConnection.h>
+#include <activemq/core/ActiveMQConstants.h>
 #include <activemq/core/ActiveMQSession.h>
-#include <activemq/core/ActiveMQTransaction.h>
+#include <activemq/core/ActiveMQTransactionContext.h>
 #include <activemq/core/ActiveMQMessage.h>
 #include <cms/ExceptionListener.h>
 
@@ -32,7 +38,6 @@
 using namespace cms;
 using namespace activemq;
 using namespace activemq::core;
-using namespace activemq::connector;
 using namespace activemq::exceptions;
 using namespace decaf::lang;
 using namespace decaf::lang::exceptions;
@@ -40,9 +45,9 @@
 using namespace decaf::util::concurrent;
 
 ////////////////////////////////////////////////////////////////////////////////
-ActiveMQConsumer::ActiveMQConsumer( connector::ConsumerInfo* consumerInfo,
+ActiveMQConsumer::ActiveMQConsumer( commands::ConsumerInfo* consumerInfo,
                                     ActiveMQSession* session,
-                                    ActiveMQTransaction* transaction ) {
+                                    ActiveMQTransactionContext* transaction ) {
 
     if( session == NULL || consumerInfo == NULL ) {
         throw ActiveMQException(
@@ -53,12 +58,13 @@
     // Init Producer Data
     this->session = session;
     this->transaction = transaction;
-    this->consumerInfo = consumerInfo;
+    this->consumerInfo.reset( consumerInfo );
     this->listener = NULL;
     this->closed = false;
 
+    // TODO - How to Detect Close
     // Listen for our resource to close
-    this->consumerInfo->addListener( this );
+    //this->consumerInfo->addListener( this );
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -66,7 +72,6 @@
 
     try {
         close();
-        delete consumerInfo;
     }
     AMQ_CATCH_NOTHROW( ActiveMQException )
     AMQ_CATCHALL_NOTHROW( )
@@ -84,20 +89,24 @@
             bool haveException = false;
             ActiveMQException error;
 
+            // TODO
             // Close the ConsumerInfo
-            if( !consumerInfo->isClosed() ) {
-                try{
-                    // We don't want a callback now
-                    this->consumerInfo->removeListener( this );
-                    this->consumerInfo->close();
-                } catch( ActiveMQException& ex ){
-                    if( !haveException ){
-                        ex.setMark( __FILE__, __LINE__ );
-                        error = ex;
-                        haveException = true;
-                    }
-                }
-            }
+//            if( !consumerInfo->isClosed() ) {
+//                try{
+//                    // We don't want a callback now
+//                    this->consumerInfo->removeListener( this );
+//                    this->consumerInfo->close();
+//                } catch( ActiveMQException& ex ){
+//                    if( !haveException ){
+//                        ex.setMark( __FILE__, __LINE__ );
+//                        error = ex;
+//                        haveException = true;
+//                    }
+//                }
+//            }
+
+            // Remove from Broker.
+            this->session->getConnection()->disposeOf( this->consumerInfo->getConsumerId() );
 
             closed = true;
 
@@ -135,7 +144,7 @@
 
     try {
         // Fetch the Selector
-        return consumerInfo->getMessageSelector();
+        return consumerInfo->getSelector();
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -148,11 +157,7 @@
 
     try {
 
-        if( closed ) {
-            throw ActiveMQException(
-                __FILE__, __LINE__,
-                "ActiveMQConsumer::receive - This Consumer is closed" );
-        }
+        this->checkClosed();
 
         synchronized( &unconsumedMessages ) {
 
@@ -218,11 +223,7 @@
 
     try{
 
-        if( closed ) {
-            throw ActiveMQException(
-                __FILE__, __LINE__,
-                "ActiveMQConsumer::receive - This Consumer is closed" );
-        }
+        this->checkClosed();
 
         // Send a request for a new message if needed
         this->sendPullRequest( 0 );
@@ -257,11 +258,7 @@
 
     try {
 
-        if( closed ) {
-            throw ActiveMQException(
-                __FILE__, __LINE__,
-                "ActiveMQConsumer::receive - This Consumer is closed" );
-        }
+        this->checkClosed();
 
         // Send a request for a new message if needed
         this->sendPullRequest( millisecs );
@@ -296,11 +293,7 @@
 
     try {
 
-        if( closed ) {
-            throw ActiveMQException(
-                __FILE__, __LINE__,
-                "ActiveMQConsumer::receive - This Consumer is closed" );
-        }
+        this->checkClosed();
 
         // Send a request for a new message if needed
         this->sendPullRequest( -1 );
@@ -334,11 +327,7 @@
 
     try{
 
-        if( closed ) {
-            throw ActiveMQException(
-                __FILE__, __LINE__,
-                "ActiveMQConsumer::receive - This Consumer is closed" );
-        }
+        this->checkClosed();
 
         this->listener = listener;
 
@@ -393,8 +382,9 @@
         // transaction so that there is a copy to commit if commit is called in
         // the async onMessage method and also so we know that our copy can
         // be deleted.
-        transaction->addToTransaction(
-            dynamic_cast<cms::Message*>( message )->clone(), this );
+// TODO
+//        transaction->addToTransaction(
+//            dynamic_cast<cms::Message*>( message )->clone(), this );
     }
 }
 
@@ -405,7 +395,7 @@
     try{
 
         if( session->isAutoAcknowledge() || messageExpired ) {
-            this->acknowledge( message, Connector::ACK_TYPE_CONSUMED );
+            this->acknowledge( message, ActiveMQConstants::ACK_TYPE_CONSUMED );
         }
 
         // The Message is cleaned up here.
@@ -422,14 +412,10 @@
 
     try{
 
-        if( closed ) {
-            throw ActiveMQException(
-                __FILE__, __LINE__,
-                "ActiveMQConsumer::receive - This Consumer is closed" );
-        }
+        this->checkClosed();
 
         // Send an ack indicating that the client has consumed the message
-        this->acknowledge( message, Connector::ACK_TYPE_CONSUMED );
+        this->acknowledge( message, ActiveMQConstants::ACK_TYPE_CONSUMED );
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -442,20 +428,43 @@
 
     try{
 
-        if( closed ) {
+        this->checkClosed();
+
+        const commands::Message* amqMessage =
+            dynamic_cast<const commands::Message*>( message );
+
+        if( amqMessage == NULL ) {
             throw ActiveMQException(
                 __FILE__, __LINE__,
-                "ActiveMQConsumer::receive - This Consumer is closed" );
+                "ActiveMQConsumer::acknowledge - "
+                "Message was not a commands::Message derivation.");
         }
 
-        // Delegate the Ack to the Session.
-        // Delegate to connector to ack this message.
-        session->getConnection()->getConnectionData()->
-            getConnector()->acknowledge(
-                session->getSessionInfo(),
-                this->getConsumerInfo(),
-                dynamic_cast<const cms::Message*>( message ),
-                (Connector::AckType)ackType );
+        commands::MessageAck ack;
+        ack.setAckType( (int)ackType );
+        ack.setConsumerId( consumerInfo->getConsumerId()->cloneDataStructure() );
+        ack.setDestination( amqMessage->getDestination()->cloneDataStructure() );
+        ack.setFirstMessageId( amqMessage->getMessageId()->cloneDataStructure() );
+        ack.setLastMessageId( amqMessage->getMessageId()->cloneDataStructure() );
+        ack.setMessageCount( 1 );
+
+        if( this->session->getAcknowledgeMode() == cms::Session::SESSION_TRANSACTED ) {
+
+            if( this->transaction == NULL ||
+                this->transaction->getTransactionInfo() == NULL ||
+                this->transaction->getTransactionInfo()->getTransactionId() == NULL ) {
+
+                throw ActiveMQException(
+                        __FILE__, __LINE__,
+                        "ActiveMQConsumer::acknowledge - "
+                        "Transacted Session, has no Transaction Info.");
+            }
+
+            ack.setTransactionId(
+                this->transaction->getTransactionInfo()->getTransactionId()->cloneDataStructure() );
+        }
+
+        this->session->oneway( &ack );
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -471,7 +480,7 @@
 
         // Don't dispatch expired messages, ack it and then destroy it
         if( message->isExpired() ) {
-            this->acknowledge( message, Connector::ACK_TYPE_CONSUMED );
+            this->acknowledge( message, ActiveMQConstants::ACK_TYPE_CONSUMED );
             delete message;
 
             // stop now, don't queue
@@ -524,16 +533,28 @@
 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConsumer::sendPullRequest( long long timeout )
-    throw ( ActiveMQException ) {
+    throw ( activemq::exceptions::ActiveMQException ) {
 
     try {
 
+        this->checkClosed();
+
         // There are still local message, consume them first.
-        if( !unconsumedMessages.empty() ) {
+        if( !this->unconsumedMessages.empty() ) {
             return;
         }
 
-        this->session->sendPullRequest( this->consumerInfo, timeout );
+        if( this->consumerInfo->getPrefetchSize() == 0 ) {
+
+            commands::MessagePull messagePull;
+            messagePull.setConsumerId(
+                this->consumerInfo->getConsumerId()->cloneDataStructure() );
+            messagePull.setDestination(
+                this->consumerInfo->getDestination()->cloneDataStructure() );
+            messagePull.setTimeout( timeout );
+
+            this->session->oneway( &messagePull );
+        }
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -541,29 +562,10 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::onConnectorResourceClosed(
-    const ConnectorResource* resource ) throw ( cms::CMSException ) {
-
-    try{
-
-        if( closed ) {
-            throw ActiveMQException(
-                __FILE__, __LINE__,
-                "ActiveMQConsumer::onConnectorResourceClosed - "
-                "Producer Already Closed");
-        }
-
-        if( resource != consumerInfo ) {
-            throw ActiveMQException(
-                __FILE__, __LINE__,
-                "ActiveMQConsumer::onConnectorResourceClosed - "
-                "Unknown object passed to this callback");
-        }
-
-        // If our producer isn't closed already, then lets close
-        this->close();
+void ActiveMQConsumer::checkClosed() throw( exceptions::ActiveMQException ) {
+    if( closed ) {
+        throw ActiveMQException(
+            __FILE__, __LINE__,
+            "ActiveMQConsumer - Consumer Already Closed" );
     }
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException )
 }

Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.h?rev=736842&r1=736841&r2=736842&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.h Thu Jan 22 14:55:27 2009
@@ -24,25 +24,24 @@
 
 #include <activemq/util/Config.h>
 #include <activemq/exceptions/ActiveMQException.h>
-#include <activemq/connector/ConsumerInfo.h>
-#include <activemq/connector/ConnectorResourceListener.h>
+#include <activemq/commands/ConsumerInfo.h>
 #include <activemq/core/ActiveMQAckHandler.h>
 #include <activemq/core/Dispatcher.h>
 
 #include <decaf/util/Queue.h>
 #include <decaf/util/concurrent/Mutex.h>
+#include <memory>
 
 namespace activemq{
 namespace core{
 
     class ActiveMQSession;
-    class ActiveMQTransaction;
+    class ActiveMQTransactionContext;
 
     class AMQCPP_API ActiveMQConsumer :
         public cms::MessageConsumer,
         public ActiveMQAckHandler,
-        public Dispatcher,
-        public connector::ConnectorResourceListener
+        public Dispatcher
     {
     private:
 
@@ -54,12 +53,12 @@
         /**
          * The Transaction Context, null if not in a Transacted Session.
          */
-        ActiveMQTransaction* transaction;
+        ActiveMQTransactionContext* transaction;
 
         /**
          * The Consumer info for this Consumer
          */
-        connector::ConsumerInfo* consumerInfo;
+        std::auto_ptr<commands::ConsumerInfo> consumerInfo;
 
         /**
          * The Message Listener for this Consumer
@@ -72,6 +71,11 @@
         decaf::util::Queue<DispatchData> unconsumedMessages;
 
         /**
+         * Queue of consumed messages.
+         */
+        decaf::util::Queue<cms::Message*> dispatchedMessages;
+
+        /**
          * Boolean that indicates if the consumer has been closed
          */
         bool closed;
@@ -81,9 +85,9 @@
         /**
          * Constructor
          */
-        ActiveMQConsumer( connector::ConsumerInfo* consumerInfo,
+        ActiveMQConsumer( commands::ConsumerInfo* consumerInfo,
                           ActiveMQSession* session,
-                          ActiveMQTransaction* transaction );
+                          ActiveMQTransactionContext* transaction );
 
         virtual ~ActiveMQConsumer();
 
@@ -178,33 +182,10 @@
          * Get the Consumer information for this consumer
          * @return Pointer to a Consumer Info Object
          */
-        virtual connector::ConsumerInfo* getConsumerInfo() {
-            return consumerInfo;
+        virtual commands::ConsumerInfo* getConsumerInfo() {
+            return consumerInfo.get();
         }
 
-        /**
-         * If supported sends a message pull request to the service provider asking
-         * for the delivery of a new message.  This is used in the case where the
-         * service provider has been configured with a zero prefectch or is only
-         * capable of delivering messages on a pull basis.  No request is made if
-         * there are already messages in the uncomsumed queue since there's no need
-         * for a server round-trip in that instance.
-         * @param timeout - the time that the client is willing to wait.
-         */
-        virtual void sendPullRequest( long long timeout )
-            throw ( exceptions::ActiveMQException );
-
-    protected:   // ConnectorResourceListener
-
-        /**
-         * When a Connector Resouce is closed it will notify any registered
-         * Listeners of its close so that they can take the appropriate
-         * action.
-         * @param resource - The ConnectorResource that was closed.
-         */
-        virtual void onConnectorResourceClosed(
-            const connector::ConnectorResource* resource ) throw ( cms::CMSException );
-
     protected:
 
         /**
@@ -226,7 +207,7 @@
          * @throws InvalidStateException if this consumer is closed upon
          * entering this method.
          */
-        ActiveMQMessage* dequeue(int timeout) throw ( cms::CMSException );
+        ActiveMQMessage* dequeue( int timeout ) throw ( cms::CMSException );
 
         /**
          * Pre-consume processing
@@ -241,6 +222,23 @@
          */
         virtual void afterMessageIsConsumed( ActiveMQMessage* message, bool messageExpired );
 
+    private:
+
+        /**
+         * If supported sends a message pull request to the service provider asking
+         * for the delivery of a new message.  This is used in the case where the
+         * service provider has been configured with a zero prefectch or is only
+         * capable of delivering messages on a pull basis.  No request is made if
+         * there are already messages in the uncomsumed queue since there's no need
+         * for a server round-trip in that instance.
+         * @param timeout - the time that the client is willing to wait.
+         */
+        virtual void sendPullRequest( long long timeout )
+            throw ( exceptions::ActiveMQException );
+
+        // Checks for the closed state and throws if so.
+        void checkClosed() throw( exceptions::ActiveMQException );
+
     };
 
 }}



Mime
View raw message