activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r426431 [4/14] - in /incubator/activemq/branches/activemq-4.0: activemq-core/src/gram/script/ activemq-core/src/main/java/org/apache/activemq/kaha/impl/ activemq-core/src/main/java/org/apache/activemq/openwire/v1/ activemq-core/src/test/jav...
Date Fri, 28 Jul 2006 08:22:55 GMT
Modified: incubator/activemq/branches/activemq-4.0/openwire-cpp/src/main/cpp/activemq/IDataStructure.hpp
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/openwire-cpp/src/main/cpp/activemq/IDataStructure.hpp?rev=426431&r1=426430&r2=426431&view=diff
==============================================================================
--- incubator/activemq/branches/activemq-4.0/openwire-cpp/src/main/cpp/activemq/IDataStructure.hpp (original)
+++ incubator/activemq/branches/activemq-4.0/openwire-cpp/src/main/cpp/activemq/IDataStructure.hpp Fri Jul 28 01:22:48 2006
@@ -1,51 +1,51 @@
-/*
- * Copyright 2006 The Apache Software Foundation or its licensors, as
- * applicable.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef ActiveMQ_IDataStructure_hpp_
-#define ActiveMQ_IDataStructure_hpp_
-
-#include "ppr/io/IOutputStream.hpp"
-#include "ppr/io/IInputStream.hpp"
-#include "ppr/util/ifr/p"
-
-namespace apache
-{
-  namespace activemq
-  {
-    namespace protocol
-    {
-      struct IMarshaller ;
-    }
-    using namespace ifr;
-    using namespace apache::activemq::protocol;
-    using namespace apache::ppr::io;
-
-/*
- * An OpenWire data structure.
- */
-struct IDataStructure : Interface
-{
-    virtual unsigned char getDataStructureType() = 0 ;
-    virtual bool isMarshallAware() = 0 ;
-    virtual int marshal(p<IMarshaller> marshaller, int mode, p<IOutputStream> writer) = 0 ;
-    virtual void unmarshal(p<IMarshaller> marshaller, int mode, p<IInputStream> reader) = 0 ;
-} ;
-
-/* namespace */
-  }
-}
-
-#endif /*ActiveMQ_IDataStructure_hpp_*/
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ActiveMQ_IDataStructure_hpp_
+#define ActiveMQ_IDataStructure_hpp_
+
+#include "ppr/io/IOutputStream.hpp"
+#include "ppr/io/IInputStream.hpp"
+#include "ppr/util/ifr/p"
+
+namespace apache
+{
+  namespace activemq
+  {
+    namespace protocol
+    {
+      struct IMarshaller ;
+    }
+    using namespace ifr;
+    using namespace apache::activemq::protocol;
+    using namespace apache::ppr::io;
+
+/*
+ * An OpenWire data structure.
+ */
+struct IDataStructure : Interface
+{
+    virtual unsigned char getDataStructureType() = 0 ;
+    virtual bool isMarshallAware() = 0 ;
+    virtual int marshal(p<IMarshaller> marshaller, int mode, p<IOutputStream> writer) = 0 ;
+    virtual void unmarshal(p<IMarshaller> marshaller, int mode, p<IInputStream> reader) = 0 ;
+} ;
+
+/* namespace */
+  }
+}
+
+#endif /*ActiveMQ_IDataStructure_hpp_*/

Propchange: incubator/activemq/branches/activemq-4.0/openwire-cpp/src/main/cpp/activemq/IDataStructure.hpp
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/branches/activemq-4.0/openwire-cpp/src/main/cpp/activemq/ISynchronization.hpp
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/openwire-cpp/src/main/cpp/activemq/ISynchronization.hpp?rev=426431&r1=426430&r2=426431&view=diff
==============================================================================
--- incubator/activemq/branches/activemq-4.0/openwire-cpp/src/main/cpp/activemq/ISynchronization.hpp (original)
+++ incubator/activemq/branches/activemq-4.0/openwire-cpp/src/main/cpp/activemq/ISynchronization.hpp Fri Jul 28 01:22:48 2006
@@ -1,47 +1,47 @@
-/*
- * Copyright 2006 The Apache Software Foundation or its licensors, as
- * applicable.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef ActiveMQ_ISynchronization_hpp_
-#define ActiveMQ_ISynchronization_hpp_
-
-#include "ppr/util/ifr/p"
-
-namespace apache
-{
-  namespace activemq
-  {
-    using namespace ifr;
-
-/*
- * 
- */
-struct ISynchronization : Interface
-{
-    // Called before a commit
-    virtual void beforeCommit() = 0 ;
-    
-    // Called after a commit
-    virtual void afterCommit() = 0 ;
-    
-    // Called after a transaction rollback
-    virtual void afterRollback() = 0 ;
-} ;
-
-/* namespace */
-  }
-}
-
-#endif /*ActiveMQ_ISynchronization_hpp_*/
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ActiveMQ_ISynchronization_hpp_
+#define ActiveMQ_ISynchronization_hpp_
+
+#include "ppr/util/ifr/p"
+
+namespace apache
+{
+  namespace activemq
+  {
+    using namespace ifr;
+
+/*
+ * 
+ */
+struct ISynchronization : Interface
+{
+    // Called before a commit
+    virtual void beforeCommit() = 0 ;
+    
+    // Called after a commit
+    virtual void afterCommit() = 0 ;
+    
+    // Called after a transaction rollback
+    virtual void afterRollback() = 0 ;
+} ;
+
+/* namespace */
+  }
+}
+
+#endif /*ActiveMQ_ISynchronization_hpp_*/

Propchange: incubator/activemq/branches/activemq-4.0/openwire-cpp/src/main/cpp/activemq/ISynchronization.hpp
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/branches/activemq-4.0/openwire-cpp/src/main/cpp/activemq/MessageAckType.hpp
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/openwire-cpp/src/main/cpp/activemq/MessageAckType.hpp?rev=426431&r1=426430&r2=426431&view=diff
==============================================================================
--- incubator/activemq/branches/activemq-4.0/openwire-cpp/src/main/cpp/activemq/MessageAckType.hpp (original)
+++ incubator/activemq/branches/activemq-4.0/openwire-cpp/src/main/cpp/activemq/MessageAckType.hpp Fri Jul 28 01:22:48 2006
@@ -1,33 +1,33 @@
-/*
- * Copyright 2006 The Apache Software Foundation or its licensors, as
- * applicable.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef ActiveMQ_MessageAckType_hpp_
-#define ActiveMQ_MessageAckType_hpp_
-
-namespace apache
-{
-  namespace activemq
-  {
-    enum MessageAckType
-    {
-      DeliveredAck = 0,
-      PoisonAck    = 1,
-      ConsumedAck  = 2
-    } ;
-  }
-}
-
-#endif /*ActiveMQ_MessageAckType_hpp_*/
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ActiveMQ_MessageAckType_hpp_
+#define ActiveMQ_MessageAckType_hpp_
+
+namespace apache
+{
+  namespace activemq
+  {
+    enum MessageAckType
+    {
+      DeliveredAck = 0,
+      PoisonAck    = 1,
+      ConsumedAck  = 2
+    } ;
+  }
+}
+
+#endif /*ActiveMQ_MessageAckType_hpp_*/

Propchange: incubator/activemq/branches/activemq-4.0/openwire-cpp/src/main/cpp/activemq/MessageAckType.hpp
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/branches/activemq-4.0/openwire-cpp/src/main/cpp/activemq/MessageConsumer.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/openwire-cpp/src/main/cpp/activemq/MessageConsumer.cpp?rev=426431&r1=426430&r2=426431&view=diff
==============================================================================
--- incubator/activemq/branches/activemq-4.0/openwire-cpp/src/main/cpp/activemq/MessageConsumer.cpp (original)
+++ incubator/activemq/branches/activemq-4.0/openwire-cpp/src/main/cpp/activemq/MessageConsumer.cpp Fri Jul 28 01:22:48 2006
@@ -1,307 +1,307 @@
-/*
- * Copyright 2006 The Apache Software Foundation or its licensors, as
- * applicable.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "activemq/MessageConsumer.hpp"
-#include "activemq/Session.hpp"
-
-using namespace apache::activemq;
-
-/*
- * 
- */
-MessageConsumer::MessageConsumer(p<Session> session, p<ConsumerInfo> consumerInfo, AcknowledgementMode acknowledgementMode)
-{
-    this->session                = session ;
-    this->consumerInfo           = consumerInfo ;
-    this->acknowledgementMode    = acknowledgementMode ;
-    this->dispatcher             = new Dispatcher() ;
-    this->listener               = NULL ;
-    this->closed                 = false ;
-    this->maximumRedeliveryCount = 10 ;
-    this->redeliveryTimeout      = 500 ;
-}
-
-/*
- *
- */
-MessageConsumer::~MessageConsumer()
-{
-    // Make sure consumer is closed
-    close() ;
-}
-
-// Attribute methods ------------------------------------------------
-
-/*
- *
- */
-void MessageConsumer::setMessageListener(p<IMessageListener> listener)
-{
-    this->listener = listener ;
-}
-
-/*
- *
- */
-p<IMessageListener> MessageConsumer::getMessageListener()
-{
-    return listener ;
-}
-
-/*
- *
- */
-p<ConsumerId> MessageConsumer::getConsumerId()
-{
-    return consumerInfo->getConsumerId() ;
-}
-
-/*
- *
- */
-void MessageConsumer::setMaximumRedeliveryCount(int count)
-{
-    this->maximumRedeliveryCount = count ;
-}
-
-/*
- *
- */
-int MessageConsumer::getMaximumRedeliveryCount()
-{
-    return maximumRedeliveryCount ;
-}
-
-/*
- *
- */
-void MessageConsumer::setRedeliveryTimeout(int timeout)
-{
-    this->redeliveryTimeout = timeout ;
-}
-
-/*
- *
- */
-int MessageConsumer::getRedeliveryTimeout()
-{
-    return redeliveryTimeout ;
-}
-
-
-// Operation methods ------------------------------------------------
-
-/*
- *
- */
-p<IMessage> MessageConsumer::receive()
-{
-    checkClosed() ;
-    return autoAcknowledge( dispatcher->dequeue() ) ;
-}
-
-/*
- *
- */
-p<IMessage> MessageConsumer::receive(int timeout)
-{
-    checkClosed() ;
-    return autoAcknowledge( dispatcher->dequeue(timeout) ) ;
-}
-
-/*
- *
- */
-p<IMessage> MessageConsumer::receiveNoWait()
-{
-    checkClosed() ;
-    return autoAcknowledge( dispatcher->dequeueNoWait() ) ;
-}
-
-/*
- *
- */
-void MessageConsumer::redeliverRolledBackMessages()
-{
-    dispatcher->redeliverRolledBackMessages() ;
-}
-
-/*
- * Transport callback that handles messages dispatching
- */
-void MessageConsumer::dispatch(p<IMessage> message)
-{
-    dispatcher->enqueue(message) ;
-
-    // Activate background dispatch thread if async listener is set up
-    if( listener != NULL )
-        session->dispatch() ;
-}
-
-/*
- *
- */
-void MessageConsumer::dispatchAsyncMessages()
-{
-    while( listener != NULL )
-    {
-        p<IMessage> message = dispatcher->dequeueNoWait() ;
-
-        if( message != NULL )
-        {
-            listener->onMessage(message) ;
-
-            // Auto acknowledge message if selected
-            autoAcknowledge(message) ;
-        }
-        else
-            break ;
-    }
-}
-
-/*
- * IAcknowledger callback method.
- */
-void MessageConsumer::acknowledge(p<ActiveMQMessage> message)
-{
-    doClientAcknowledge(message) ;
-}
-
-/*
- * 
- */
-void MessageConsumer::close()
-{
-    if( !closed )
-    {
-        closed = true ;
-    
-        // De-register consumer from broker
-        session->getConnection()->disposeOf( consumerInfo->getConsumerId() ) ;
-
-        // Reset internal state (prevent cyclic references)
-        session = NULL ;
-    }
-}
-
-
-// Implementation methods ------------------------------------------------
-
-/*
- *
- */
-void MessageConsumer::checkClosed() throw(CmsException) 
-{
-    if( closed )
-        throw ConnectionClosedException("Oops! Connection already closed") ;
-}
-
-/*
- *
- */
-p<IMessage> MessageConsumer::autoAcknowledge(p<IMessage> message)
-{
-    try
-    {
-        // Is the message an ActiveMQMessage? (throws bad_cast otherwise)
-        p<ActiveMQMessage> activeMessage = p_dyncast<ActiveMQMessage> (message) ;
-
-        // Register the handler for client acknowledgment
-        activeMessage->setAcknowledger( smartify(this) ) ;
-
-        if( acknowledgementMode != ClientAckMode )
-            doAcknowledge(activeMessage) ;
-    }
-    catch( bad_cast& bc )
-    {
-        // ignore
-    }
-    return message ;
-}
-
-/*
- *
- */
-void MessageConsumer::doClientAcknowledge(p<ActiveMQMessage> message)
-{
-    if( acknowledgementMode == ClientAckMode )
-        doAcknowledge(message);
-}
-
-/*
- *
- */
-void MessageConsumer::doAcknowledge(p<Message> message)
-{
-    p<MessageAck> ack = createMessageAck(message) ;
-    //cout << "Sending Ack: " << ack->getAckType() << endl ;
-    session->getConnection()->syncRequest(ack) ;
-}
-
-/*
- *
- */
-p<MessageAck> MessageConsumer::createMessageAck(p<Message> message)
-{
-    p<MessageAck> ack = new MessageAck() ;
-
-    // Set ack properties
-    ack->setAckType( ConsumedAck ) ;
-    ack->setConsumerId( consumerInfo->getConsumerId() ) ;
-    ack->setDestination( message->getDestination() ) ;
-    ack->setFirstMessageId( message->getMessageId() ) ;
-    ack->setLastMessageId( message->getMessageId() ) ;
-    ack->setMessageCount( 1 ) ;
-    
-    if( session->isTransacted() )
-    {
-        session->doStartTransaction() ;
-        ack->setTransactionId( session->getTransactionContext()->getTransactionId() ) ;
-        session->getTransactionContext()->addSynchronization( new MessageConsumerSynchronization(smartify(this), message) ) ;
-    }
-    return ack ;
-}
-
-/*
- *
- */
-void MessageConsumer::afterRollback(p<ActiveMQMessage> message)
-{
-    // Try redeliver of the message again
-    message->setRedeliveryCounter( message->getRedeliveryCounter() + 1 ) ;
-
-    // Check if redeliver count has exceeded maximum
-    if( message->getRedeliveryCounter() > maximumRedeliveryCount )
-    {
-        // Send back a poisoned pill
-        p<MessageAck> ack = new MessageAck() ;
-        ack->setAckType( PoisonAck ) ;
-        ack->setConsumerId( consumerInfo->getConsumerId() ) ;
-        ack->setDestination( message->getDestination() ) ;
-        ack->setFirstMessageId( message->getMessageId() ) ;
-        ack->setLastMessageId( message->getMessageId() ) ;
-        ack->setMessageCount( 1 ) ;
-        session->getConnection()->oneway(ack) ;
-    }
-    else
-    {
-        dispatcher->redeliver(message) ;
-        
-        // Re-dispatch the message at some point in the future
-        if( listener != NULL )
-            session->dispatch( redeliveryTimeout ) ;
-    }
-}
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "activemq/MessageConsumer.hpp"
+#include "activemq/Session.hpp"
+
+using namespace apache::activemq;
+
+/*
+ * 
+ */
+MessageConsumer::MessageConsumer(p<Session> session, p<ConsumerInfo> consumerInfo, AcknowledgementMode acknowledgementMode)
+{
+    this->session                = session ;
+    this->consumerInfo           = consumerInfo ;
+    this->acknowledgementMode    = acknowledgementMode ;
+    this->dispatcher             = new Dispatcher() ;
+    this->listener               = NULL ;
+    this->closed                 = false ;
+    this->maximumRedeliveryCount = 10 ;
+    this->redeliveryTimeout      = 500 ;
+}
+
+/*
+ *
+ */
+MessageConsumer::~MessageConsumer()
+{
+    // Make sure consumer is closed
+    close() ;
+}
+
+// Attribute methods ------------------------------------------------
+
+/*
+ *
+ */
+void MessageConsumer::setMessageListener(p<IMessageListener> listener)
+{
+    this->listener = listener ;
+}
+
+/*
+ *
+ */
+p<IMessageListener> MessageConsumer::getMessageListener()
+{
+    return listener ;
+}
+
+/*
+ *
+ */
+p<ConsumerId> MessageConsumer::getConsumerId()
+{
+    return consumerInfo->getConsumerId() ;
+}
+
+/*
+ *
+ */
+void MessageConsumer::setMaximumRedeliveryCount(int count)
+{
+    this->maximumRedeliveryCount = count ;
+}
+
+/*
+ *
+ */
+int MessageConsumer::getMaximumRedeliveryCount()
+{
+    return maximumRedeliveryCount ;
+}
+
+/*
+ *
+ */
+void MessageConsumer::setRedeliveryTimeout(int timeout)
+{
+    this->redeliveryTimeout = timeout ;
+}
+
+/*
+ *
+ */
+int MessageConsumer::getRedeliveryTimeout()
+{
+    return redeliveryTimeout ;
+}
+
+
+// Operation methods ------------------------------------------------
+
+/*
+ *
+ */
+p<IMessage> MessageConsumer::receive()
+{
+    checkClosed() ;
+    return autoAcknowledge( dispatcher->dequeue() ) ;
+}
+
+/*
+ *
+ */
+p<IMessage> MessageConsumer::receive(int timeout)
+{
+    checkClosed() ;
+    return autoAcknowledge( dispatcher->dequeue(timeout) ) ;
+}
+
+/*
+ *
+ */
+p<IMessage> MessageConsumer::receiveNoWait()
+{
+    checkClosed() ;
+    return autoAcknowledge( dispatcher->dequeueNoWait() ) ;
+}
+
+/*
+ *
+ */
+void MessageConsumer::redeliverRolledBackMessages()
+{
+    dispatcher->redeliverRolledBackMessages() ;
+}
+
+/*
+ * Transport callback that handles messages dispatching
+ */
+void MessageConsumer::dispatch(p<IMessage> message)
+{
+    dispatcher->enqueue(message) ;
+
+    // Activate background dispatch thread if async listener is set up
+    if( listener != NULL )
+        session->dispatch() ;
+}
+
+/*
+ *
+ */
+void MessageConsumer::dispatchAsyncMessages()
+{
+    while( listener != NULL )
+    {
+        p<IMessage> message = dispatcher->dequeueNoWait() ;
+
+        if( message != NULL )
+        {
+            listener->onMessage(message) ;
+
+            // Auto acknowledge message if selected
+            autoAcknowledge(message) ;
+        }
+        else
+            break ;
+    }
+}
+
+/*
+ * IAcknowledger callback method.
+ */
+void MessageConsumer::acknowledge(p<ActiveMQMessage> message)
+{
+    doClientAcknowledge(message) ;
+}
+
+/*
+ * 
+ */
+void MessageConsumer::close()
+{
+    if( !closed )
+    {
+        closed = true ;
+    
+        // De-register consumer from broker
+        session->getConnection()->disposeOf( consumerInfo->getConsumerId() ) ;
+
+        // Reset internal state (prevent cyclic references)
+        session = NULL ;
+    }
+}
+
+
+// Implementation methods ------------------------------------------------
+
+/*
+ *
+ */
+void MessageConsumer::checkClosed() throw(CmsException) 
+{
+    if( closed )
+        throw ConnectionClosedException("Oops! Connection already closed") ;
+}
+
+/*
+ *
+ */
+p<IMessage> MessageConsumer::autoAcknowledge(p<IMessage> message)
+{
+    try
+    {
+        // Is the message an ActiveMQMessage? (throws bad_cast otherwise)
+        p<ActiveMQMessage> activeMessage = p_dyncast<ActiveMQMessage> (message) ;
+
+        // Register the handler for client acknowledgment
+        activeMessage->setAcknowledger( smartify(this) ) ;
+
+        if( acknowledgementMode != ClientAckMode )
+            doAcknowledge(activeMessage) ;
+    }
+    catch( bad_cast& bc )
+    {
+        // ignore
+    }
+    return message ;
+}
+
+/*
+ *
+ */
+void MessageConsumer::doClientAcknowledge(p<ActiveMQMessage> message)
+{
+    if( acknowledgementMode == ClientAckMode )
+        doAcknowledge(message);
+}
+
+/*
+ *
+ */
+void MessageConsumer::doAcknowledge(p<Message> message)
+{
+    p<MessageAck> ack = createMessageAck(message) ;
+    //cout << "Sending Ack: " << ack->getAckType() << endl ;
+    session->getConnection()->syncRequest(ack) ;
+}
+
+/*
+ *
+ */
+p<MessageAck> MessageConsumer::createMessageAck(p<Message> message)
+{
+    p<MessageAck> ack = new MessageAck() ;
+
+    // Set ack properties
+    ack->setAckType( ConsumedAck ) ;
+    ack->setConsumerId( consumerInfo->getConsumerId() ) ;
+    ack->setDestination( message->getDestination() ) ;
+    ack->setFirstMessageId( message->getMessageId() ) ;
+    ack->setLastMessageId( message->getMessageId() ) ;
+    ack->setMessageCount( 1 ) ;
+    
+    if( session->isTransacted() )
+    {
+        session->doStartTransaction() ;
+        ack->setTransactionId( session->getTransactionContext()->getTransactionId() ) ;
+        session->getTransactionContext()->addSynchronization( new MessageConsumerSynchronization(smartify(this), message) ) ;
+    }
+    return ack ;
+}
+
+/*
+ *
+ */
+void MessageConsumer::afterRollback(p<ActiveMQMessage> message)
+{
+    // Try redeliver of the message again
+    message->setRedeliveryCounter( message->getRedeliveryCounter() + 1 ) ;
+
+    // Check if redeliver count has exceeded maximum
+    if( message->getRedeliveryCounter() > maximumRedeliveryCount )
+    {
+        // Send back a poisoned pill
+        p<MessageAck> ack = new MessageAck() ;
+        ack->setAckType( PoisonAck ) ;
+        ack->setConsumerId( consumerInfo->getConsumerId() ) ;
+        ack->setDestination( message->getDestination() ) ;
+        ack->setFirstMessageId( message->getMessageId() ) ;
+        ack->setLastMessageId( message->getMessageId() ) ;
+        ack->setMessageCount( 1 ) ;
+        session->getConnection()->oneway(ack) ;
+    }
+    else
+    {
+        dispatcher->redeliver(message) ;
+        
+        // Re-dispatch the message at some point in the future
+        if( listener != NULL )
+            session->dispatch( redeliveryTimeout ) ;
+    }
+}

Propchange: incubator/activemq/branches/activemq-4.0/openwire-cpp/src/main/cpp/activemq/MessageConsumer.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/branches/activemq-4.0/openwire-cpp/src/main/cpp/activemq/MessageConsumer.hpp
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/openwire-cpp/src/main/cpp/activemq/MessageConsumer.hpp?rev=426431&r1=426430&r2=426431&view=diff
==============================================================================
--- incubator/activemq/branches/activemq-4.0/openwire-cpp/src/main/cpp/activemq/MessageConsumer.hpp (original)
+++ incubator/activemq/branches/activemq-4.0/openwire-cpp/src/main/cpp/activemq/MessageConsumer.hpp Fri Jul 28 01:22:48 2006
@@ -1,102 +1,102 @@
-/*
- * Copyright 2006 The Apache Software Foundation or its licensors, as
- * applicable.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef ActiveMQ_MessageConsumer_hpp_
-#define ActiveMQ_MessageConsumer_hpp_
-
-#include <string>
-#include "cms/IDestination.hpp"
-#include "cms/IMessage.hpp"
-#include "cms/IMessageConsumer.hpp"
-#include "cms/IMessageListener.hpp"
-#include "cms/CmsException.hpp"
-#include "activemq/AcknowledgementMode.hpp"
-#include "activemq/MessageAckType.hpp"
-#include "activemq/Dispatcher.hpp"
-#include "activemq/IAcknowledger.hpp"
-#include "activemq/MessageConsumerSynchronization.hpp"
-#include "activemq/ConnectionClosedException.hpp"
-#include "activemq/command/ConsumerInfo.hpp"
-#include "activemq/command/Message.hpp"
-#include "activemq/command/ActiveMQMessage.hpp"
-#include "activemq/command/MessageAck.hpp"
-#include "ppr/util/ifr/p"
-#include "ppr/thread/Thread.hpp"
-
-// Turn off warning message for ignored exception specification
-#ifdef _MSC_VER
-#pragma warning( disable : 4290 )
-#endif
-
-namespace apache
-{
-  namespace activemq
-  {
-      using namespace ifr;
-      using namespace apache::cms;
-      using namespace apache::ppr::thread;
-      class Session ;
-
-/*
- * 
- */
-class MessageConsumer : public IMessageConsumer, public IAcknowledger
-{
-private:
-    p<Session>          session ;
-    p<ConsumerInfo>     consumerInfo ;
-    p<Dispatcher>       dispatcher ;
-    p<IMessageListener> listener ;
-    AcknowledgementMode acknowledgementMode ;
-    bool                closed ;
-    int                 maximumRedeliveryCount,
-                        redeliveryTimeout ;
-
-public:
-    MessageConsumer(p<Session> session, p<ConsumerInfo> consumerInfo, AcknowledgementMode acknowledgementMode) ;
-    virtual ~MessageConsumer() ;
-
-    virtual void setMessageListener(p<IMessageListener> listener) ;
-    virtual p<IMessageListener> getMessageListener() ;
-    virtual p<ConsumerId> getConsumerId() ;
-    virtual void setMaximumRedeliveryCount(int count) ;
-    virtual int getMaximumRedeliveryCount() ;
-    virtual void setRedeliveryTimeout(int timeout) ;
-    virtual int getRedeliveryTimeout() ;
-
-    virtual p<IMessage> receive() ;
-    virtual p<IMessage> receive(int timeout) ;
-    virtual p<IMessage> receiveNoWait() ;
-    virtual void redeliverRolledBackMessages() ;
-    virtual void dispatch(p<IMessage> message) ;
-    virtual void dispatchAsyncMessages() ;
-    virtual void afterRollback(p<ActiveMQMessage> message) ;
-    virtual void acknowledge(p<ActiveMQMessage> message) ;
-    virtual void close() ;
-
-protected:
-    void checkClosed() throw(CmsException) ;
-    p<IMessage> autoAcknowledge(p<IMessage> message) ;
-    void doClientAcknowledge(p<ActiveMQMessage> message) ;
-    void doAcknowledge(p<Message> message) ;
-    p<MessageAck> createMessageAck(p<Message> message) ;
-} ;
-
-/* namespace */
-  }
-}
-
-#endif /*ActiveMQ_IMessageConsumer_hpp_*/
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ActiveMQ_MessageConsumer_hpp_
+#define ActiveMQ_MessageConsumer_hpp_
+
+#include <string>
+#include "cms/IDestination.hpp"
+#include "cms/IMessage.hpp"
+#include "cms/IMessageConsumer.hpp"
+#include "cms/IMessageListener.hpp"
+#include "cms/CmsException.hpp"
+#include "activemq/AcknowledgementMode.hpp"
+#include "activemq/MessageAckType.hpp"
+#include "activemq/Dispatcher.hpp"
+#include "activemq/IAcknowledger.hpp"
+#include "activemq/MessageConsumerSynchronization.hpp"
+#include "activemq/ConnectionClosedException.hpp"
+#include "activemq/command/ConsumerInfo.hpp"
+#include "activemq/command/Message.hpp"
+#include "activemq/command/ActiveMQMessage.hpp"
+#include "activemq/command/MessageAck.hpp"
+#include "ppr/util/ifr/p"
+#include "ppr/thread/Thread.hpp"
+
+// Turn off warning message for ignored exception specification
+#ifdef _MSC_VER
+#pragma warning( disable : 4290 )
+#endif
+
+namespace apache
+{
+  namespace activemq
+  {
+      using namespace ifr;
+      using namespace apache::cms;
+      using namespace apache::ppr::thread;
+      class Session ;
+
+/*
+ * 
+ */
+class MessageConsumer : public IMessageConsumer, public IAcknowledger
+{
+private:
+    p<Session>          session ;
+    p<ConsumerInfo>     consumerInfo ;
+    p<Dispatcher>       dispatcher ;
+    p<IMessageListener> listener ;
+    AcknowledgementMode acknowledgementMode ;
+    bool                closed ;
+    int                 maximumRedeliveryCount,
+                        redeliveryTimeout ;
+
+public:
+    MessageConsumer(p<Session> session, p<ConsumerInfo> consumerInfo, AcknowledgementMode acknowledgementMode) ;
+    virtual ~MessageConsumer() ;
+
+    virtual void setMessageListener(p<IMessageListener> listener) ;
+    virtual p<IMessageListener> getMessageListener() ;
+    virtual p<ConsumerId> getConsumerId() ;
+    virtual void setMaximumRedeliveryCount(int count) ;
+    virtual int getMaximumRedeliveryCount() ;
+    virtual void setRedeliveryTimeout(int timeout) ;
+    virtual int getRedeliveryTimeout() ;
+
+    virtual p<IMessage> receive() ;
+    virtual p<IMessage> receive(int timeout) ;
+    virtual p<IMessage> receiveNoWait() ;
+    virtual void redeliverRolledBackMessages() ;
+    virtual void dispatch(p<IMessage> message) ;
+    virtual void dispatchAsyncMessages() ;
+    virtual void afterRollback(p<ActiveMQMessage> message) ;
+    virtual void acknowledge(p<ActiveMQMessage> message) ;
+    virtual void close() ;
+
+protected:
+    void checkClosed() throw(CmsException) ;
+    p<IMessage> autoAcknowledge(p<IMessage> message) ;
+    void doClientAcknowledge(p<ActiveMQMessage> message) ;
+    void doAcknowledge(p<Message> message) ;
+    p<MessageAck> createMessageAck(p<Message> message) ;
+} ;
+
+/* namespace */
+  }
+}
+
+#endif /*ActiveMQ_IMessageConsumer_hpp_*/

Propchange: incubator/activemq/branches/activemq-4.0/openwire-cpp/src/main/cpp/activemq/MessageConsumer.hpp
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/branches/activemq-4.0/openwire-cpp/src/main/cpp/activemq/MessageConsumerSynchronization.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/openwire-cpp/src/main/cpp/activemq/MessageConsumerSynchronization.cpp?rev=426431&r1=426430&r2=426431&view=diff
==============================================================================
--- incubator/activemq/branches/activemq-4.0/openwire-cpp/src/main/cpp/activemq/MessageConsumerSynchronization.cpp (original)
+++ incubator/activemq/branches/activemq-4.0/openwire-cpp/src/main/cpp/activemq/MessageConsumerSynchronization.cpp Fri Jul 28 01:22:48 2006
@@ -1,60 +1,60 @@
-/*
- * Copyright 2006 The Apache Software Foundation or its licensors, as
- * applicable.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "activemq/MessageConsumerSynchronization.hpp"
-#include "activemq/MessageConsumer.hpp"
-
-using namespace apache::activemq;
-
-/*
- * 
- */
-MessageConsumerSynchronization::MessageConsumerSynchronization(p<MessageConsumer> consumer, p<Message> message)
-{
-    this->consumer = consumer ;
-    this->message  = message ;
-}
-
-/*
- * 
- */
-MessageConsumerSynchronization::~MessageConsumerSynchronization()
-{
-}
-
-/*
- * 
- */
-void MessageConsumerSynchronization::beforeCommit()
-{
-    // no-op
-}
-
-/*
- * 
- */
-void MessageConsumerSynchronization::afterCommit()
-{
-    // no-op
-}
-
-/*
- * 
- */
-void MessageConsumerSynchronization::afterRollback()
-{
-    consumer->afterRollback( p_cast<ActiveMQMessage> (message)) ;
-}
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "activemq/MessageConsumerSynchronization.hpp"
+#include "activemq/MessageConsumer.hpp"
+
+using namespace apache::activemq;
+
+/*
+ * 
+ */
+MessageConsumerSynchronization::MessageConsumerSynchronization(p<MessageConsumer> consumer, p<Message> message)
+{
+    this->consumer = consumer ;
+    this->message  = message ;
+}
+
+/*
+ * 
+ */
+MessageConsumerSynchronization::~MessageConsumerSynchronization()
+{
+}
+
+/*
+ * 
+ */
+void MessageConsumerSynchronization::beforeCommit()
+{
+    // no-op
+}
+
+/*
+ * 
+ */
+void MessageConsumerSynchronization::afterCommit()
+{
+    // no-op
+}
+
+/*
+ * 
+ */
+void MessageConsumerSynchronization::afterRollback()
+{
+    consumer->afterRollback( p_cast<ActiveMQMessage> (message)) ;
+}

Propchange: incubator/activemq/branches/activemq-4.0/openwire-cpp/src/main/cpp/activemq/MessageConsumerSynchronization.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/branches/activemq-4.0/openwire-cpp/src/main/cpp/activemq/MessageConsumerSynchronization.hpp
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/openwire-cpp/src/main/cpp/activemq/MessageConsumerSynchronization.hpp?rev=426431&r1=426430&r2=426431&view=diff
==============================================================================
--- incubator/activemq/branches/activemq-4.0/openwire-cpp/src/main/cpp/activemq/MessageConsumerSynchronization.hpp (original)
+++ incubator/activemq/branches/activemq-4.0/openwire-cpp/src/main/cpp/activemq/MessageConsumerSynchronization.hpp Fri Jul 28 01:22:48 2006
@@ -1,54 +1,54 @@
-/*
- * Copyright 2006 The Apache Software Foundation or its licensors, as
- * applicable.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef ActiveMQ_MessageConsumerSynchronization_hpp_
-#define ActiveMQ_MessageConsumerSynchronization_hpp_
-
-#include "activemq/ISynchronization.hpp"
-#include "activemq/command/ActiveMQMessage.hpp"
-#include "activemq/command/Message.hpp"
-#include "ppr/util/ifr/p"
-
-namespace apache
-{
-  namespace activemq
-  {
-    using namespace ifr;
-    class MessageConsumer;
-
-/*
- * 
- */
-class MessageConsumerSynchronization : public ISynchronization
-{
-private:
-    p<MessageConsumer> consumer ;
-    p<Message>         message ;
-
-public:
-    MessageConsumerSynchronization(p<MessageConsumer> consumer, p<Message> message) ;
-    ~MessageConsumerSynchronization() ;
-
-    virtual void beforeCommit() ;
-    virtual void afterCommit() ;
-    virtual void afterRollback() ;
-} ;
-
-/* namespace */
-  }
-}
-
-#endif /*ActiveMQ_MessageConsumerSynchronization_hpp_*/
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ActiveMQ_MessageConsumerSynchronization_hpp_
+#define ActiveMQ_MessageConsumerSynchronization_hpp_
+
+#include "activemq/ISynchronization.hpp"
+#include "activemq/command/ActiveMQMessage.hpp"
+#include "activemq/command/Message.hpp"
+#include "ppr/util/ifr/p"
+
+namespace apache
+{
+  namespace activemq
+  {
+    using namespace ifr;
+    class MessageConsumer;
+
+/*
+ * 
+ */
+class MessageConsumerSynchronization : public ISynchronization
+{
+private:
+    p<MessageConsumer> consumer ;
+    p<Message>         message ;
+
+public:
+    MessageConsumerSynchronization(p<MessageConsumer> consumer, p<Message> message) ;
+    ~MessageConsumerSynchronization() ;
+
+    virtual void beforeCommit() ;
+    virtual void afterCommit() ;
+    virtual void afterRollback() ;
+} ;
+
+/* namespace */
+  }
+}
+
+#endif /*ActiveMQ_MessageConsumerSynchronization_hpp_*/

Propchange: incubator/activemq/branches/activemq-4.0/openwire-cpp/src/main/cpp/activemq/MessageConsumerSynchronization.hpp
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/branches/activemq-4.0/openwire-cpp/src/main/cpp/activemq/MessageProducer.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/openwire-cpp/src/main/cpp/activemq/MessageProducer.cpp?rev=426431&r1=426430&r2=426431&view=diff
==============================================================================
--- incubator/activemq/branches/activemq-4.0/openwire-cpp/src/main/cpp/activemq/MessageProducer.cpp (original)
+++ incubator/activemq/branches/activemq-4.0/openwire-cpp/src/main/cpp/activemq/MessageProducer.cpp Fri Jul 28 01:22:48 2006
@@ -1,179 +1,179 @@
-/*
- * Copyright 2006 The Apache Software Foundation or its licensors, as
- * applicable.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "activemq/MessageProducer.hpp"
-#include "activemq/Session.hpp"
-
-using namespace apache::activemq;
-
-
-// Constructors -----------------------------------------------------
-
-/*
- * 
- */
-MessageProducer::MessageProducer(p<Session> session, p<ProducerInfo> producerInfo)
-{
-    this->session                 = session ;
-    this->producerInfo            = producerInfo ;
-    this->priority                = DEFAULT_PRIORITY ;
-    this->timeToLive              = DEFAULT_TIMETOLIVE ;
-    this->messageCounter          = 0 ;
-    this->persistent              = false ;
-    this->disableMessageID        = false ;
-    this->disableMessageTimestamp = false ;
-    this->closed                  = false ;
-}
-
-/*
- * 
- */
-MessageProducer::~MessageProducer()
-{
-    // Make sure the producer is closed
-    close() ;
-}
-
-
-// Attribute methods ------------------------------------------------
-
-bool MessageProducer::getPersistent()
-{
-    return persistent ;
-}
-
-void MessageProducer::setPersistent(bool persistent)
-{
-    this->persistent = persistent ;
-}
-
-long long MessageProducer::getTimeToLive()
-{
-    return timeToLive ;
-}
-
-void MessageProducer::getTimeToLive(long long ttl)
-{
-    this->timeToLive = ttl ;
-}
-
-int MessageProducer::getPriority()
-{
-    return priority ;
-}
-
-void MessageProducer::getPriority(int priority)
-{
-    this->priority = priority ;
-}
-
-bool MessageProducer::getDisableMessageID()
-{
-    return disableMessageID ;
-}
-
-void MessageProducer::getDisableMessageID(bool disable)
-{
-    this->disableMessageID = disable ;
-}
-
-bool MessageProducer::getDisableMessageTimestamp()
-{
-    return disableMessageTimestamp ;
-}
-
-void MessageProducer::getDisableMessageTimestamp(bool disable)
-{
-    this->disableMessageTimestamp = disable ;
-}
-
-
-// Operation methods ------------------------------------------------
-
-/*
- * 
- */
-void MessageProducer::send(p<IMessage> message)
-{
-    send(producerInfo->getDestination(), message, DEFAULT_PRIORITY, DEFAULT_TIMETOLIVE) ;
-}
-
-/*
- * 
- */
-void MessageProducer::send(p<IDestination> destination, p<IMessage> message)
-{
-    send(destination, message, DEFAULT_PRIORITY, DEFAULT_TIMETOLIVE) ;
-}
-
-/*
- * 
- */
-void MessageProducer::send(p<IDestination> destination, p<IMessage> message, char priority, long long timeToLive)
-{
-    p<MessageId> msgId = new MessageId() ;
-    msgId->setProducerId( producerInfo->getProducerId() ) ;
-
-    // Acquire next sequence id
-    {
-        LOCKED_SCOPE (mutex);
-        msgId->setProducerSequenceId( ++messageCounter ) ;
-    }
-
-    // Configure the message
-    p<ActiveMQMessage> activeMessage = p_dyncast<ActiveMQMessage> (message) ;
-    activeMessage->setMessageId( msgId ) ;
-    activeMessage->setProducerId( producerInfo->getProducerId() ) ;
-    activeMessage->setDestination( p_dyncast<ActiveMQDestination> (destination) ) ;
-    activeMessage->setPriority(priority) ;
-
-    if( session->isTransacted() )
-    {
-        session->doStartTransaction() ;
-        activeMessage->setTransactionId( session->getTransactionContext()->getTransactionId() ) ;
-    }
-
-    // Set time values if not disabled
-    if( !this->disableMessageTimestamp )
-    {
-        long long timestamp = Time::getCurrentTimeMillis() ;
-
-        // Set message time stamp/expiration
-        activeMessage->setTimestamp(timestamp) ;
-        if( timeToLive > 0 )
-            activeMessage->setExpiration( timestamp + timeToLive ) ;
-    }
-
-    // Finally, transmit the message
-    session->doSend(destination, message) ;
-}
-
-/*
- * 
- */
-void MessageProducer::close()
-{
-    if( !closed )
-    {
-        closed = true ;
-    
-        // De-register producer from broker
-        session->getConnection()->disposeOf( producerInfo->getProducerId() ) ;
-
-        // Reset internal state (prevent cyclic references)
-        session = NULL ;
-    }
-}
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "activemq/MessageProducer.hpp"
+#include "activemq/Session.hpp"
+
+using namespace apache::activemq;
+
+
+// Constructors -----------------------------------------------------
+
+/*
+ * 
+ */
+MessageProducer::MessageProducer(p<Session> session, p<ProducerInfo> producerInfo)
+{
+    this->session                 = session ;
+    this->producerInfo            = producerInfo ;
+    this->priority                = DEFAULT_PRIORITY ;
+    this->timeToLive              = DEFAULT_TIMETOLIVE ;
+    this->messageCounter          = 0 ;
+    this->persistent              = false ;
+    this->disableMessageID        = false ;
+    this->disableMessageTimestamp = false ;
+    this->closed                  = false ;
+}
+
+/*
+ * 
+ */
+MessageProducer::~MessageProducer()
+{
+    // Make sure the producer is closed
+    close() ;
+}
+
+
+// Attribute methods ------------------------------------------------
+
+bool MessageProducer::getPersistent()
+{
+    return persistent ;
+}
+
+void MessageProducer::setPersistent(bool persistent)
+{
+    this->persistent = persistent ;
+}
+
+long long MessageProducer::getTimeToLive()
+{
+    return timeToLive ;
+}
+
+void MessageProducer::getTimeToLive(long long ttl)
+{
+    this->timeToLive = ttl ;
+}
+
+int MessageProducer::getPriority()
+{
+    return priority ;
+}
+
+void MessageProducer::getPriority(int priority)
+{
+    this->priority = priority ;
+}
+
+bool MessageProducer::getDisableMessageID()
+{
+    return disableMessageID ;
+}
+
+void MessageProducer::getDisableMessageID(bool disable)
+{
+    this->disableMessageID = disable ;
+}
+
+bool MessageProducer::getDisableMessageTimestamp()
+{
+    return disableMessageTimestamp ;
+}
+
+void MessageProducer::getDisableMessageTimestamp(bool disable)
+{
+    this->disableMessageTimestamp = disable ;
+}
+
+
+// Operation methods ------------------------------------------------
+
+/*
+ * 
+ */
+void MessageProducer::send(p<IMessage> message)
+{
+    send(producerInfo->getDestination(), message, DEFAULT_PRIORITY, DEFAULT_TIMETOLIVE) ;
+}
+
+/*
+ * 
+ */
+void MessageProducer::send(p<IDestination> destination, p<IMessage> message)
+{
+    send(destination, message, DEFAULT_PRIORITY, DEFAULT_TIMETOLIVE) ;
+}
+
+/*
+ * 
+ */
+void MessageProducer::send(p<IDestination> destination, p<IMessage> message, char priority, long long timeToLive)
+{
+    p<MessageId> msgId = new MessageId() ;
+    msgId->setProducerId( producerInfo->getProducerId() ) ;
+
+    // Acquire next sequence id
+    {
+        LOCKED_SCOPE (mutex);
+        msgId->setProducerSequenceId( ++messageCounter ) ;
+    }
+
+    // Configure the message
+    p<ActiveMQMessage> activeMessage = p_dyncast<ActiveMQMessage> (message) ;
+    activeMessage->setMessageId( msgId ) ;
+    activeMessage->setProducerId( producerInfo->getProducerId() ) ;
+    activeMessage->setDestination( p_dyncast<ActiveMQDestination> (destination) ) ;
+    activeMessage->setPriority(priority) ;
+
+    if( session->isTransacted() )
+    {
+        session->doStartTransaction() ;
+        activeMessage->setTransactionId( session->getTransactionContext()->getTransactionId() ) ;
+    }
+
+    // Set time values if not disabled
+    if( !this->disableMessageTimestamp )
+    {
+        long long timestamp = Time::getCurrentTimeMillis() ;
+
+        // Set message time stamp/expiration
+        activeMessage->setTimestamp(timestamp) ;
+        if( timeToLive > 0 )
+            activeMessage->setExpiration( timestamp + timeToLive ) ;
+    }
+
+    // Finally, transmit the message
+    session->doSend(destination, message) ;
+}
+
+/*
+ * 
+ */
+void MessageProducer::close()
+{
+    if( !closed )
+    {
+        closed = true ;
+    
+        // De-register producer from broker
+        session->getConnection()->disposeOf( producerInfo->getProducerId() ) ;
+
+        // Reset internal state (prevent cyclic references)
+        session = NULL ;
+    }
+}

Propchange: incubator/activemq/branches/activemq-4.0/openwire-cpp/src/main/cpp/activemq/MessageProducer.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/branches/activemq-4.0/openwire-cpp/src/main/cpp/activemq/MessageProducer.hpp
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/openwire-cpp/src/main/cpp/activemq/MessageProducer.hpp?rev=426431&r1=426430&r2=426431&view=diff
==============================================================================
--- incubator/activemq/branches/activemq-4.0/openwire-cpp/src/main/cpp/activemq/MessageProducer.hpp (original)
+++ incubator/activemq/branches/activemq-4.0/openwire-cpp/src/main/cpp/activemq/MessageProducer.hpp Fri Jul 28 01:22:48 2006
@@ -1,91 +1,91 @@
-/*
- * Copyright 2006 The Apache Software Foundation or its licensors, as
- * applicable.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef ActiveMQ_MessageProducer_hpp_
-#define ActiveMQ_MessageProducer_hpp_
-
-#include <string>
-#include "cms/IDestination.hpp"
-#include "cms/IMessage.hpp"
-#include "cms/IMessageProducer.hpp"
-#include "activemq/command/ProducerInfo.hpp"
-#include "ppr/thread/SimpleMutex.hpp"
-#include "ppr/util/Time.hpp"
-#include "ppr/util/ifr/p"
-
-namespace apache
-{
-  namespace activemq
-  {
-    using namespace ifr;
-    using namespace apache::cms;
-    using namespace apache::activemq::command;
-    using namespace apache::ppr;
-    using namespace apache::ppr::thread;
-    class Session ;
-
-/*
- * 
- */
-class MessageProducer : public IMessageProducer
-{
-private:
-    p<Session>      session ;
-    p<ProducerInfo> producerInfo ;
-    SimpleMutex     mutex ;
-    int             priority ;
-    long long       messageCounter,
-                    timeToLive ;
-    bool            closed,
-                    persistent,
-                    disableMessageID,
-                    disableMessageTimestamp ;
-
-    // Default message values
-    const static char      DEFAULT_PRIORITY   = 4 ;
-    const static long long DEFAULT_TIMETOLIVE = 0 ;
-
-public:
-    MessageProducer(p<Session> session, p<ProducerInfo> producerInfo) ;
-    virtual ~MessageProducer() ;
-
-    // Attribute methods
-	virtual bool getPersistent() ;
-	virtual void setPersistent(bool persistent) ;
-    virtual long long getTimeToLive() ;
-    virtual void getTimeToLive(long long ttl) ;
-    virtual int getPriority() ;
-    virtual void getPriority(int priority) ;
-    virtual bool getDisableMessageID() ;
-    virtual void getDisableMessageID(bool disable) ;
-    virtual bool getDisableMessageTimestamp() ;
-    virtual void getDisableMessageTimestamp(bool disable) ;
-
-    // Operation methods
-    virtual void send(p<IMessage> message) ;
-    virtual void send(p<IDestination> destination, p<IMessage> message) ;
-    virtual void send(p<IDestination> destination, p<IMessage> message, char priority, long long timeToLive) ;
-    virtual void close() ;
-
-private:
-    long long getCurrentTimeMillis() ;
-} ;
-
-/* namespace */
-  }
-}
-
-#endif /*ActiveMQ_MessageProducer_hpp_*/
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ActiveMQ_MessageProducer_hpp_
+#define ActiveMQ_MessageProducer_hpp_
+
+#include <string>
+#include "cms/IDestination.hpp"
+#include "cms/IMessage.hpp"
+#include "cms/IMessageProducer.hpp"
+#include "activemq/command/ProducerInfo.hpp"
+#include "ppr/thread/SimpleMutex.hpp"
+#include "ppr/util/Time.hpp"
+#include "ppr/util/ifr/p"
+
+namespace apache
+{
+  namespace activemq
+  {
+    using namespace ifr;
+    using namespace apache::cms;
+    using namespace apache::activemq::command;
+    using namespace apache::ppr;
+    using namespace apache::ppr::thread;
+    class Session ;
+
+/*
+ * 
+ */
+class MessageProducer : public IMessageProducer
+{
+private:
+    p<Session>      session ;
+    p<ProducerInfo> producerInfo ;
+    SimpleMutex     mutex ;
+    int             priority ;
+    long long       messageCounter,
+                    timeToLive ;
+    bool            closed,
+                    persistent,
+                    disableMessageID,
+                    disableMessageTimestamp ;
+
+    // Default message values
+    const static char      DEFAULT_PRIORITY   = 4 ;
+    const static long long DEFAULT_TIMETOLIVE = 0 ;
+
+public:
+    MessageProducer(p<Session> session, p<ProducerInfo> producerInfo) ;
+    virtual ~MessageProducer() ;
+
+    // Attribute methods
+	virtual bool getPersistent() ;
+	virtual void setPersistent(bool persistent) ;
+    virtual long long getTimeToLive() ;
+    virtual void getTimeToLive(long long ttl) ;
+    virtual int getPriority() ;
+    virtual void getPriority(int priority) ;
+    virtual bool getDisableMessageID() ;
+    virtual void getDisableMessageID(bool disable) ;
+    virtual bool getDisableMessageTimestamp() ;
+    virtual void getDisableMessageTimestamp(bool disable) ;
+
+    // Operation methods
+    virtual void send(p<IMessage> message) ;
+    virtual void send(p<IDestination> destination, p<IMessage> message) ;
+    virtual void send(p<IDestination> destination, p<IMessage> message, char priority, long long timeToLive) ;
+    virtual void close() ;
+
+private:
+    long long getCurrentTimeMillis() ;
+} ;
+
+/* namespace */
+  }
+}
+
+#endif /*ActiveMQ_MessageProducer_hpp_*/

Propchange: incubator/activemq/branches/activemq-4.0/openwire-cpp/src/main/cpp/activemq/MessageProducer.hpp
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/branches/activemq-4.0/openwire-cpp/src/main/cpp/activemq/Session.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/openwire-cpp/src/main/cpp/activemq/Session.cpp?rev=426431&r1=426430&r2=426431&view=diff
==============================================================================
--- incubator/activemq/branches/activemq-4.0/openwire-cpp/src/main/cpp/activemq/Session.cpp (original)
+++ incubator/activemq/branches/activemq-4.0/openwire-cpp/src/main/cpp/activemq/Session.cpp Fri Jul 28 01:22:48 2006
@@ -1,508 +1,508 @@
-/*
- * Copyright 2006 The Apache Software Foundation or its licensors, as
- * applicable.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "activemq/Session.hpp"
-#include "activemq/command/ActiveMQDestination.hpp"
-#include "activemq/command/ActiveMQQueue.hpp"
-#include "activemq/command/ActiveMQTopic.hpp"
-#include "activemq/command/ActiveMQTempQueue.hpp"
-#include "activemq/command/ActiveMQTempTopic.hpp"
-#include "activemq/command/ActiveMQMessage.hpp"
-#include "activemq/command/ActiveMQBytesMessage.hpp"
-#include "activemq/command/ActiveMQMapMessage.hpp"
-#include "activemq/command/ActiveMQTextMessage.hpp"
-#include "activemq/command/ProducerInfo.hpp"
-#include "activemq/command/ConsumerInfo.hpp"
-#include "activemq/command/MessageAck.hpp"
-#include "activemq/MessageConsumer.hpp"
-#include "activemq/MessageProducer.hpp"
-#include "activemq/Connection.hpp"
-
-using namespace apache::activemq;
-
-
-// Constructors -----------------------------------------------------
-
-/*
- * 
- */
-Session::Session(p<Connection> connection, p<SessionInfo> info, AcknowledgementMode ackMode)
-{
-    this->connection         = connection ;
-    this->sessionInfo        = info ;
-    this->ackMode            = ackMode ;
-    this->prefetchSize       = 1000 ;
-    this->consumerCounter    = 0 ;
-    this->producerCounter    = 0 ;
-    this->transactionContext = new TransactionContext(smartify(this)) ;
-    this->dispatchThread     = new DispatchThread(smartify(this)) ;
-    this->closed             = false ;
-
-    // Activate backround dispatch thread
-    dispatchThread->start() ;
-}
-
-/*
- * 
- */
-Session::~Session()
-{
-    // Make sure session is closed
-    close() ;
-}
-
-
-// Attribute methods ------------------------------------------------
-
-/*
- * 
- */
-bool Session::isTransacted()
-{
-    return ( ackMode == TransactionalAckMode ) ? true : false ; 
-}
-
-/*
- * 
- */
-p<Connection> Session::getConnection()
-{
-    return connection ;
-}
-
-/*
- * 
- */
-p<SessionId> Session::getSessionId()
-{
-    return sessionInfo->getSessionId() ;
-}
-
-/*
- * 
- */
-p<TransactionContext> Session::getTransactionContext()
-{
-    return transactionContext ;
-}
-
-/*
- * 
- */
-p<MessageConsumer> Session::getConsumer(p<ConsumerId> consumerId)
-{
-    map<long long, p<MessageConsumer> >::iterator tempIter ;
-
-    // Check if key exists in map
-    tempIter = consumers.find( consumerId->getValue() ) ;
-    if( tempIter == consumers.end() )
-        return NULL ;
-    else
-        return tempIter->second ;
-}
-
-/*
- * 
- */
-p<MessageProducer> Session::getProducer(p<ProducerId> producerId)
-{
-    map<long long, p<MessageProducer> >::iterator tempIter ;
-
-    // Check if key exists in map
-    tempIter = producers.find( producerId->getValue() ) ;
-    if( tempIter == producers.end() )
-        return NULL ;
-    else
-        return tempIter->second ;
-}
-
-
-// Operation methods ------------------------------------------------
-
-/*
- * 
- */
-p<IMessageProducer> Session::createProducer()
-{
-    return createProducer(NULL) ; 
-}
-
-/*
- * 
- */
-p<IMessageProducer> Session::createProducer(p<IDestination> destination)
-{
-    p<ProducerInfo> command  = createProducerInfo(destination) ;
-    p<ProducerId> producerId = command->getProducerId() ;
-
-    try
-    {
-        p<MessageProducer> producer = new MessageProducer(smartify(this), command) ;
-
-        // Save the producer
-        producers[ producerId->getValue() ] = producer ;
-
-        // Register producer with broker
-        connection->syncRequest(command) ;
-
-        return producer ;
-    }
-    catch( exception e )
-    {
-        // Make sure producer was removed
-        producers[ producerId->getValue() ] = NULL ;
-        throw e ;
-    }
-}
-
-/*
- * 
- */
-p<IMessageConsumer> Session::createConsumer(p<IDestination> destination)
-{
-    return createConsumer(destination, NULL) ; 
-}
-
-/*
- * 
- */
-p<IMessageConsumer> Session::createConsumer(p<IDestination> destination, const char* selector)
-{
-    p<ConsumerInfo> command  = createConsumerInfo(destination, selector) ;
-    p<ConsumerId> consumerId = command->getConsumerId() ;
-
-    try
-    {
-        p<MessageConsumer> consumer = new MessageConsumer(smartify(this), command, ackMode) ;
-
-        // Save the consumer first in case message dispatching starts immediately
-        consumers[ consumerId->getValue() ] = consumer ;
-
-        // Register consumer with broker
-        connection->syncRequest(command) ;
-
-        return consumer ;
-    }
-    catch( exception e )
-    {
-        // Make sure consumer was removed
-        consumers[ consumerId->getValue() ] = NULL ;
-        throw e ;
-    }
-}
-
-p<IMessageConsumer> Session::createDurableConsumer(p<ITopic> destination, const char* name, const char* selector, bool noLocal)
-{
-    p<ConsumerInfo> command  = createConsumerInfo(destination, selector) ;
-    p<ConsumerId> consumerId = command->getConsumerId() ;
-    p<string>     subscriptionName = new string(name) ;
-
-    command->setSubcriptionName( subscriptionName ) ;
-    command->setNoLocal( noLocal ) ;
-    
-    try
-    {
-        p<MessageConsumer> consumer = new MessageConsumer(smartify(this), command, ackMode) ;
-
-        // Save the consumer first in case message dispatching starts immediately
-        consumers[ consumerId->getValue() ] = consumer ;
-
-        // Register consumer with broker
-        connection->syncRequest(command) ;
-
-        return consumer ;
-    }
-    catch( exception e )
-    {
-        // Make sure consumer was removed
-        consumers[ consumerId->getValue() ] = NULL ;
-        throw e ;
-    }
-}
-
-/*
- * 
- */
-p<IQueue> Session::getQueue(const char* name)
-{
-    p<IQueue> queue = new ActiveMQQueue(name) ;
-    return queue ;
-}
-
-/*
- * 
- */
-p<ITopic> Session::getTopic(const char* name)
-{
-    p<ITopic> topic = new ActiveMQTopic(name) ;
-    return topic ;
-}
-
-/*
- * 
- */
-p<ITemporaryQueue> Session::createTemporaryQueue()
-{
-    p<ITemporaryQueue> queue = new ActiveMQTempQueue( connection->createTemporaryDestinationName()->c_str() ) ;
-    return queue ;
-}
-
-/*
- * 
- */
-p<ITemporaryTopic> Session::createTemporaryTopic()
-{
-    p<ITemporaryTopic> topic = new ActiveMQTempTopic( connection->createTemporaryDestinationName()->c_str() ) ;
-    return topic ;
-}
-
-/*
- * 
- */
-p<IMessage> Session::createMessage()
-{
-    p<IMessage> message = new ActiveMQMessage() ;
-    configure(message) ;
-    return message ;
-}
-
-/*
- * 
- */
-p<IBytesMessage> Session::createBytesMessage()
-{
-    p<IBytesMessage> message = new ActiveMQBytesMessage() ;
-    configure(message) ;
-    return message ;
-}
-
-/*
- * 
- */
-p<IBytesMessage> Session::createBytesMessage(char* body, int size)
-{
-    p<IBytesMessage> message = new ActiveMQBytesMessage( body, size ) ;
-    configure(message) ;
-    return message ;
-}
-
-/*
- * 
- */
-p<IMapMessage> Session::createMapMessage()
-{
-    p<IMapMessage> message = new ActiveMQMapMessage() ;
-    configure(message) ;
-    return message ;
-}
-
-/*
- * 
- */
-p<ITextMessage> Session::createTextMessage()
-{
-    p<ITextMessage> message = new ActiveMQTextMessage() ;
-    configure(message) ;
-    return message ;
-}
-
-/*
- * 
- */
-p<ITextMessage> Session::createTextMessage(const char* text)
-{
-    p<ITextMessage> message = new ActiveMQTextMessage(text) ;
-    configure(message) ;
-    return message ;
-}
-
-/*
- * 
- */
-void Session::commit() throw(CmsException)
-{
-    if( !isTransacted() )
-        throw CmsException("You cannot perform a commit on a non-transacted session. Acknowlegement mode is: " + ackMode) ;
-
-    transactionContext->commit() ;
-}
-
-/*
- * 
- */
-void Session::rollback() throw(CmsException)
-{
-    if( !isTransacted() )
-        throw CmsException("You cannot perform a rollback on a non-transacted session. Acknowlegement mode is: " + ackMode) ;
-
-    transactionContext->rollback() ;
-
-    map<long long, p<MessageConsumer> >::const_iterator tempIter ;
-
-    // Ensure all the consumers redeliver any rolled back messages
-    for( tempIter = consumers.begin() ;
-         tempIter != consumers.end() ;
-         tempIter++ )
-    {
-        ((*tempIter).second)->redeliverRolledBackMessages() ;
-    }
-}
-
-/*
- * 
- */
-void Session::doSend(p<IDestination> destination, p<IMessage> message)
-{
-    p<ActiveMQMessage> command = p_dyncast<ActiveMQMessage> (message) ;
-    // TODO complete packet
-    connection->syncRequest(command) ;
-}
-
-/*
- * Starts a new transaction
- */
-void Session::doStartTransaction()
-{
-    if( isTransacted() )
-        transactionContext->begin() ;
-}
-
-/*
- * 
- */
-void Session::dispatch(int delay)
-{
-    if( delay > 0 ) 
-        dispatchThread->sleep(delay) ;
-
-    dispatchThread->wakeup() ;
-}
-
-/*
- * 
- */
-void Session::dispatchAsyncMessages()
-{
-    // Ensure that only 1 thread dispatches messages in a consumer at once
-    LOCKED_SCOPE (mutex);
-
-    map<long long, p<MessageConsumer> >::const_iterator tempIter ;
-
-    // Iterate through each consumer created by this session
-    // ensuring that they have all pending messages dispatched
-    for( tempIter = consumers.begin() ;
-         tempIter != consumers.end() ;
-         tempIter++ )
-    {
-        ((*tempIter).second)->dispatchAsyncMessages() ;
-    }
-}
-
-/*
- *
- */
-void Session::close()
-{
-    if( !closed )
-    {
-        map<long long, p<MessageConsumer> >::iterator consumerIter ;
-        map<long long, p<MessageProducer> >::iterator producerIter ;
-
-        // Iterate through all consumers and close them down
-        for( consumerIter = consumers.begin() ;
-             consumerIter != consumers.end() ;
-             consumerIter++ )
-        {
-            consumerIter->second->close() ;
-            consumerIter->second = NULL ;
-        }
-
-        // Iterate through all producers and close them down
-        for( producerIter = producers.begin() ;
-             producerIter != producers.end() ;
-             producerIter++ )
-        {
-            producerIter->second->close() ;
-            producerIter->second = NULL ;
-        }
-        // De-register session from broker
-        connection->disposeOf( sessionInfo->getSessionId() ) ;
-
-        // Clean up
-        connection = NULL ;
-        closed     = true ;
-    }
-}
-
-
-// Implementation methods ------------------------------------------
-
-/*
- * 
- */
-p<ConsumerInfo> Session::createConsumerInfo(p<IDestination> destination, const char* selector)
-{
-    p<ConsumerInfo> consumerInfo = new ConsumerInfo() ;
-    p<ConsumerId> consumerId = new ConsumerId() ;
-
-    consumerId->setConnectionId( sessionInfo->getSessionId()->getConnectionId() ) ;
-    consumerId->setSessionId( sessionInfo->getSessionId()->getValue() ) ;
-
-    {
-        LOCKED_SCOPE (mutex);
-        consumerId->setValue( ++consumerCounter ) ;
-    }
-    p<string> sel = ( selector == NULL ) ? NULL : new string(selector) ;
-
-    // TODO complete packet
-    consumerInfo->setConsumerId( consumerId ) ;
-    consumerInfo->setDestination( p_dyncast<ActiveMQDestination> (destination) ) ; //ActiveMQDestination::transform(destination) ) ;
-    consumerInfo->setSelector( sel ) ;
-    consumerInfo->setPrefetchSize( this->prefetchSize ) ;
-
-    return consumerInfo ;
-}
-
-/*
- * 
- */
-p<ProducerInfo> Session::createProducerInfo(p<IDestination> destination)
-{
-    p<ProducerInfo> producerInfo = new ProducerInfo() ;
-    p<ProducerId> producerId = new ProducerId() ;
-
-    producerId->setConnectionId( sessionInfo->getSessionId()->getConnectionId() ) ;
-    producerId->setSessionId( sessionInfo->getSessionId()->getValue() ) ;
-
-    {
-        LOCKED_SCOPE (mutex);
-        producerId->setValue( ++producerCounter ) ;
-    }
-
-    // TODO complete packet
-    producerInfo->setProducerId( producerId ) ;
-    producerInfo->setDestination( p_dyncast<ActiveMQDestination> (destination) ) ; //ActiveMQDestination::transform(destination) ) ;
-
-    return producerInfo ;
-} 
-
-/*
- * Configures the message command.
- */
-void Session::configure(p<IMessage> message)
-{
-    // TODO:
-}
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "activemq/Session.hpp"
+#include "activemq/command/ActiveMQDestination.hpp"
+#include "activemq/command/ActiveMQQueue.hpp"
+#include "activemq/command/ActiveMQTopic.hpp"
+#include "activemq/command/ActiveMQTempQueue.hpp"
+#include "activemq/command/ActiveMQTempTopic.hpp"
+#include "activemq/command/ActiveMQMessage.hpp"
+#include "activemq/command/ActiveMQBytesMessage.hpp"
+#include "activemq/command/ActiveMQMapMessage.hpp"
+#include "activemq/command/ActiveMQTextMessage.hpp"
+#include "activemq/command/ProducerInfo.hpp"
+#include "activemq/command/ConsumerInfo.hpp"
+#include "activemq/command/MessageAck.hpp"
+#include "activemq/MessageConsumer.hpp"
+#include "activemq/MessageProducer.hpp"
+#include "activemq/Connection.hpp"
+
+using namespace apache::activemq;
+
+
+// Constructors -----------------------------------------------------
+
+/*
+ * 
+ */
+Session::Session(p<Connection> connection, p<SessionInfo> info, AcknowledgementMode ackMode)
+{
+    this->connection         = connection ;
+    this->sessionInfo        = info ;
+    this->ackMode            = ackMode ;
+    this->prefetchSize       = 1000 ;
+    this->consumerCounter    = 0 ;
+    this->producerCounter    = 0 ;
+    this->transactionContext = new TransactionContext(smartify(this)) ;
+    this->dispatchThread     = new DispatchThread(smartify(this)) ;
+    this->closed             = false ;
+
+    // Activate backround dispatch thread
+    dispatchThread->start() ;
+}
+
+/*
+ * 
+ */
+Session::~Session()
+{
+    // Make sure session is closed
+    close() ;
+}
+
+
+// Attribute methods ------------------------------------------------
+
+/*
+ * 
+ */
+bool Session::isTransacted()
+{
+    return ( ackMode == TransactionalAckMode ) ? true : false ; 
+}
+
+/*
+ * 
+ */
+p<Connection> Session::getConnection()
+{
+    return connection ;
+}
+
+/*
+ * 
+ */
+p<SessionId> Session::getSessionId()
+{
+    return sessionInfo->getSessionId() ;
+}
+
+/*
+ * 
+ */
+p<TransactionContext> Session::getTransactionContext()
+{
+    return transactionContext ;
+}
+
+/*
+ * 
+ */
+p<MessageConsumer> Session::getConsumer(p<ConsumerId> consumerId)
+{
+    map<long long, p<MessageConsumer> >::iterator tempIter ;
+
+    // Check if key exists in map
+    tempIter = consumers.find( consumerId->getValue() ) ;
+    if( tempIter == consumers.end() )
+        return NULL ;
+    else
+        return tempIter->second ;
+}
+
+/*
+ * 
+ */
+p<MessageProducer> Session::getProducer(p<ProducerId> producerId)
+{
+    map<long long, p<MessageProducer> >::iterator tempIter ;
+
+    // Check if key exists in map
+    tempIter = producers.find( producerId->getValue() ) ;
+    if( tempIter == producers.end() )
+        return NULL ;
+    else
+        return tempIter->second ;
+}
+
+
+// Operation methods ------------------------------------------------
+
+/*
+ * 
+ */
+p<IMessageProducer> Session::createProducer()
+{
+    return createProducer(NULL) ; 
+}
+
+/*
+ * 
+ */
+p<IMessageProducer> Session::createProducer(p<IDestination> destination)
+{
+    p<ProducerInfo> command  = createProducerInfo(destination) ;
+    p<ProducerId> producerId = command->getProducerId() ;
+
+    try
+    {
+        p<MessageProducer> producer = new MessageProducer(smartify(this), command) ;
+
+        // Save the producer
+        producers[ producerId->getValue() ] = producer ;
+
+        // Register producer with broker
+        connection->syncRequest(command) ;
+
+        return producer ;
+    }
+    catch( exception e )
+    {
+        // Make sure producer was removed
+        producers[ producerId->getValue() ] = NULL ;
+        throw e ;
+    }
+}
+
+/*
+ * 
+ */
+p<IMessageConsumer> Session::createConsumer(p<IDestination> destination)
+{
+    return createConsumer(destination, NULL) ; 
+}
+
+/*
+ * 
+ */
+p<IMessageConsumer> Session::createConsumer(p<IDestination> destination, const char* selector)
+{
+    p<ConsumerInfo> command  = createConsumerInfo(destination, selector) ;
+    p<ConsumerId> consumerId = command->getConsumerId() ;
+
+    try
+    {
+        p<MessageConsumer> consumer = new MessageConsumer(smartify(this), command, ackMode) ;
+
+        // Save the consumer first in case message dispatching starts immediately
+        consumers[ consumerId->getValue() ] = consumer ;
+
+        // Register consumer with broker
+        connection->syncRequest(command) ;
+
+        return consumer ;
+    }
+    catch( exception e )
+    {
+        // Make sure consumer was removed
+        consumers[ consumerId->getValue() ] = NULL ;
+        throw e ;
+    }
+}
+
+p<IMessageConsumer> Session::createDurableConsumer(p<ITopic> destination, const char* name, const char* selector, bool noLocal)
+{
+    p<ConsumerInfo> command  = createConsumerInfo(destination, selector) ;
+    p<ConsumerId> consumerId = command->getConsumerId() ;
+    p<string>     subscriptionName = new string(name) ;
+
+    command->setSubcriptionName( subscriptionName ) ;
+    command->setNoLocal( noLocal ) ;
+    
+    try
+    {
+        p<MessageConsumer> consumer = new MessageConsumer(smartify(this), command, ackMode) ;
+
+        // Save the consumer first in case message dispatching starts immediately
+        consumers[ consumerId->getValue() ] = consumer ;
+
+        // Register consumer with broker
+        connection->syncRequest(command) ;
+
+        return consumer ;
+    }
+    catch( exception e )
+    {
+        // Make sure consumer was removed
+        consumers[ consumerId->getValue() ] = NULL ;
+        throw e ;
+    }
+}
+
+/*
+ * 
+ */
+p<IQueue> Session::getQueue(const char* name)
+{
+    p<IQueue> queue = new ActiveMQQueue(name) ;
+    return queue ;
+}
+
+/*
+ * 
+ */
+p<ITopic> Session::getTopic(const char* name)
+{
+    p<ITopic> topic = new ActiveMQTopic(name) ;
+    return topic ;
+}
+
+/*
+ * 
+ */
+p<ITemporaryQueue> Session::createTemporaryQueue()
+{
+    p<ITemporaryQueue> queue = new ActiveMQTempQueue( connection->createTemporaryDestinationName()->c_str() ) ;
+    return queue ;
+}
+
+/*
+ * 
+ */
+p<ITemporaryTopic> Session::createTemporaryTopic()
+{
+    p<ITemporaryTopic> topic = new ActiveMQTempTopic( connection->createTemporaryDestinationName()->c_str() ) ;
+    return topic ;
+}
+
+/*
+ * 
+ */
+p<IMessage> Session::createMessage()
+{
+    p<IMessage> message = new ActiveMQMessage() ;
+    configure(message) ;
+    return message ;
+}
+
+/*
+ * 
+ */
+p<IBytesMessage> Session::createBytesMessage()
+{
+    p<IBytesMessage> message = new ActiveMQBytesMessage() ;
+    configure(message) ;
+    return message ;
+}
+
+/*
+ * 
+ */
+p<IBytesMessage> Session::createBytesMessage(char* body, int size)
+{
+    p<IBytesMessage> message = new ActiveMQBytesMessage( body, size ) ;
+    configure(message) ;
+    return message ;
+}
+
+/*
+ * 
+ */
+p<IMapMessage> Session::createMapMessage()
+{
+    p<IMapMessage> message = new ActiveMQMapMessage() ;
+    configure(message) ;
+    return message ;
+}
+
+/*
+ * 
+ */
+p<ITextMessage> Session::createTextMessage()
+{
+    p<ITextMessage> message = new ActiveMQTextMessage() ;
+    configure(message) ;
+    return message ;
+}
+
+/*
+ * 
+ */
+p<ITextMessage> Session::createTextMessage(const char* text)
+{
+    p<ITextMessage> message = new ActiveMQTextMessage(text) ;
+    configure(message) ;
+    return message ;
+}
+
+/*
+ * 
+ */
+void Session::commit() throw(CmsException)
+{
+    if( !isTransacted() )
+        throw CmsException("You cannot perform a commit on a non-transacted session. Acknowlegement mode is: " + ackMode) ;
+
+    transactionContext->commit() ;
+}
+
+/*
+ * 
+ */
+void Session::rollback() throw(CmsException)
+{
+    if( !isTransacted() )
+        throw CmsException("You cannot perform a rollback on a non-transacted session. Acknowlegement mode is: " + ackMode) ;
+
+    transactionContext->rollback() ;
+
+    map<long long, p<MessageConsumer> >::const_iterator tempIter ;
+
+    // Ensure all the consumers redeliver any rolled back messages
+    for( tempIter = consumers.begin() ;
+         tempIter != consumers.end() ;
+         tempIter++ )
+    {
+        ((*tempIter).second)->redeliverRolledBackMessages() ;
+    }
+}
+
+/*
+ * 
+ */
+void Session::doSend(p<IDestination> destination, p<IMessage> message)
+{
+    p<ActiveMQMessage> command = p_dyncast<ActiveMQMessage> (message) ;
+    // TODO complete packet
+    connection->syncRequest(command) ;
+}
+
+/*
+ * Starts a new transaction
+ */
+void Session::doStartTransaction()
+{
+    if( isTransacted() )
+        transactionContext->begin() ;
+}
+
+/*
+ * 
+ */
+void Session::dispatch(int delay)
+{
+    if( delay > 0 ) 
+        dispatchThread->sleep(delay) ;
+
+    dispatchThread->wakeup() ;
+}
+
+/*
+ * 
+ */
+void Session::dispatchAsyncMessages()
+{
+    // Ensure that only 1 thread dispatches messages in a consumer at once
+    LOCKED_SCOPE (mutex);
+
+    map<long long, p<MessageConsumer> >::const_iterator tempIter ;
+
+    // Iterate through each consumer created by this session
+    // ensuring that they have all pending messages dispatched
+    for( tempIter = consumers.begin() ;
+         tempIter != consumers.end() ;
+         tempIter++ )
+    {
+        ((*tempIter).second)->dispatchAsyncMessages() ;
+    }
+}
+
+/*
+ *
+ */
+void Session::close()
+{
+    if( !closed )
+    {
+        map<long long, p<MessageConsumer> >::iterator consumerIter ;
+        map<long long, p<MessageProducer> >::iterator producerIter ;
+
+        // Iterate through all consumers and close them down
+        for( consumerIter = consumers.begin() ;
+             consumerIter != consumers.end() ;
+             consumerIter++ )
+        {
+            consumerIter->second->close() ;
+            consumerIter->second = NULL ;
+        }
+
+        // Iterate through all producers and close them down
+        for( producerIter = producers.begin() ;
+             producerIter != producers.end() ;
+             producerIter++ )
+        {
+            producerIter->second->close() ;
+            producerIter->second = NULL ;
+        }
+        // De-register session from broker
+        connection->disposeOf( sessionInfo->getSessionId() ) ;
+
+        // Clean up
+        connection = NULL ;
+        closed     = true ;
+    }
+}
+
+
+// Implementation methods ------------------------------------------
+
+/*
+ * 
+ */
+p<ConsumerInfo> Session::createConsumerInfo(p<IDestination> destination, const char* selector)
+{
+    p<ConsumerInfo> consumerInfo = new ConsumerInfo() ;
+    p<ConsumerId> consumerId = new ConsumerId() ;
+
+    consumerId->setConnectionId( sessionInfo->getSessionId()->getConnectionId() ) ;
+    consumerId->setSessionId( sessionInfo->getSessionId()->getValue() ) ;
+
+    {
+        LOCKED_SCOPE (mutex);
+        consumerId->setValue( ++consumerCounter ) ;
+    }
+    p<string> sel = ( selector == NULL ) ? NULL : new string(selector) ;
+
+    // TODO complete packet
+    consumerInfo->setConsumerId( consumerId ) ;
+    consumerInfo->setDestination( p_dyncast<ActiveMQDestination> (destination) ) ; //ActiveMQDestination::transform(destination) ) ;
+    consumerInfo->setSelector( sel ) ;
+    consumerInfo->setPrefetchSize( this->prefetchSize ) ;
+
+    return consumerInfo ;
+}
+
+/*
+ * 
+ */
+p<ProducerInfo> Session::createProducerInfo(p<IDestination> destination)
+{
+    p<ProducerInfo> producerInfo = new ProducerInfo() ;
+    p<ProducerId> producerId = new ProducerId() ;
+
+    producerId->setConnectionId( sessionInfo->getSessionId()->getConnectionId() ) ;
+    producerId->setSessionId( sessionInfo->getSessionId()->getValue() ) ;
+
+    {
+        LOCKED_SCOPE (mutex);
+        producerId->setValue( ++producerCounter ) ;
+    }
+
+    // TODO complete packet
+    producerInfo->setProducerId( producerId ) ;
+    producerInfo->setDestination( p_dyncast<ActiveMQDestination> (destination) ) ; //ActiveMQDestination::transform(destination) ) ;
+
+    return producerInfo ;
+} 
+
+/*
+ * Configures the message command.
+ */
+void Session::configure(p<IMessage> message)
+{
+    // TODO:
+}

Propchange: incubator/activemq/branches/activemq-4.0/openwire-cpp/src/main/cpp/activemq/Session.cpp
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message