activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r617997 - in /activemq/activemq-cpp/trunk/src/main/activemq/connector: Connector.h openwire/OpenWireConnector.cpp openwire/OpenWireConnector.h stomp/StompConnector.cpp stomp/StompConnector.h
Date Sun, 03 Feb 2008 15:22:16 GMT
Author: tabish
Date: Sun Feb  3 07:22:15 2008
New Revision: 617997

URL: http://svn.apache.org/viewvc?rev=617997&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQCPP-161

Adding in support in the connector to issue a message pull command for the case where prefetch
size is set to zero.

Stomp does not support this so an UnsupportedOperation is thrown is called, the caller is
expected to call the isMessagePullSupported first to make sure it is safe.

Modified:
    activemq/activemq-cpp/trunk/src/main/activemq/connector/Connector.h
    activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h
    activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.h

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/Connector.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/Connector.h?rev=617997&r1=617996&r2=617997&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/Connector.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/Connector.h Sun Feb  3 07:22:15
2008
@@ -356,6 +356,27 @@
         virtual void setExceptionListener(
             cms::ExceptionListener* listener ) = 0;
 
+        /**
+         * Checks if this connector supports pull of a new mesage from the service
+         * provider, if so then the user can call pullMessage() on the Connector
+         * to try and get a new message added to the receive queue.
+         * @returns true if the caller can use pullMessage without an exception
+         */
+        virtual bool isMessagePullSupported() const = 0;
+
+        /**
+         * Pulls a message from the the service provider that this Connector is
+         * associated with. This could be because the service has a prefetch
+         * policy that is set to zero and therefor requires each message to
+         * be pulled from the server to the client via a poll.
+         * @param info - the consumer info for the consumer to pull for
+         * @param timeout - the time that the caller is going to wait for new messages
+         * @throw ConnectorException if a communications error occurs
+         * @throw UnsupportedOperationException if the connector can't pull
+         */
+        virtual void pullMessage( connector::ConsumerInfo* info, long long timeout )
+            throw ( ConnectorException, decaf::lang::exceptions::UnsupportedOperationException
) = 0;
+
     };
 
 }}

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp?rev=617997&r1=617996&r2=617997&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp
(original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp
Sun Feb  3 07:22:15 2008
@@ -47,6 +47,7 @@
 #include <activemq/connector/openwire/commands/DestinationInfo.h>
 #include <activemq/connector/openwire/commands/ExceptionResponse.h>
 #include <activemq/connector/openwire/commands/Message.h>
+#include <activemq/connector/openwire/commands/MessagePull.h>
 #include <activemq/connector/openwire/commands/MessageAck.h>
 #include <activemq/connector/openwire/commands/MessageDispatch.h>
 #include <activemq/connector/openwire/commands/RemoveInfo.h>
@@ -1203,6 +1204,33 @@
         throw OpenWireConnectorException( __FILE__, __LINE__,
             "caught unknown exception" );
     }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenWireConnector::pullMessage( connector::ConsumerInfo* info, long long timeout )
+    throw ( ConnectorException, decaf::lang::exceptions::UnsupportedOperationException )
{
+
+    try {
+
+        OpenWireConsumerInfo* consumer =
+             dynamic_cast<OpenWireConsumerInfo*>( info );
+
+         if( consumer->getConsumerInfo()->getPrefetchSize() == 0 ) {
+
+             commands::MessagePull messagePull;
+             messagePull.setConsumerId(
+                 consumer->getConsumerInfo()->getConsumerId()->cloneDataStructure()
);
+             messagePull.setDestination(
+                 consumer->getConsumerInfo()->getDestination()->cloneDataStructure()
);
+             messagePull.setTimeout( timeout );
+
+             // TODO - This should be Async
+             this->oneway( &messagePull );
+         }
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, OpenWireConnectorException )
+    AMQ_CATCHALL_THROW( OpenWireConnectorException )
 }
 
 ////////////////////////////////////////////////////////////////////////////////

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h?rev=617997&r1=617996&r2=617997&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h Sun
Feb  3 07:22:15 2008
@@ -558,6 +558,29 @@
             this->exceptionListener = listener;
         }
 
+        /**
+         * Checks if this connector supports pull of a new mesage from the service
+         * provider, if so then the user can call pullMessage() on the Connector
+         * to try and get a new message added to the receive queue.
+         * @returns true if the caller can use pullMessage without an exception
+         */
+        virtual bool isMessagePullSupported() const {
+            return true;
+        }
+
+        /**
+         * Pulls a message from the the service provider that this Connector is
+         * associated with. This could be because the service has a prefetch
+         * policy that is set to zero and therefor requires each message to
+         * be pulled from the server to the client via a poll.
+         * @param info - the consumer info for the consumer to pull for
+         * @param timeout - the time that the caller is going to wait for new messages
+         * @throw ConnectorException if a communications error occurs
+         * @throw UnsupportedOperationException if the connector can't pull
+         */
+        virtual void pullMessage( connector::ConsumerInfo* info, long long timeout )
+            throw ( ConnectorException, decaf::lang::exceptions::UnsupportedOperationException
);
+
     public: // transport::CommandListener
 
         /**

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.cpp?rev=617997&r1=617996&r2=617997&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.cpp Sun Feb
 3 07:22:15 2008
@@ -714,6 +714,21 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+void StompConnector::pullMessage( connector::ConsumerInfo* info AMQCPP_UNUSED, long long
timeout AMQCPP_UNUSED )
+    throw ( ConnectorException, decaf::lang::exceptions::UnsupportedOperationException )
{
+
+    try {
+        throw UnsupportedOperationException(
+            __FILE__, __LINE__,
+            "StompConnector::pullMessage - No Stomp Support for Message Pull");
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCH_RETHROW( UnsupportedOperationException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
 void StompConnector::closeResource( ConnectorResource* resource )
     throw ( ConnectorException ) {
 

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.h?rev=617997&r1=617996&r2=617997&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.h Sun Feb
 3 07:22:15 2008
@@ -477,6 +477,29 @@
             this->exceptionListener = listener;
         }
 
+        /**
+         * Checks if this connector supports pull of a new mesage from the service
+         * provider, if so then the user can call pullMessage() on the Connector
+         * to try and get a new message added to the receive queue.
+         * @returns true if the caller can use pullMessage without an exception
+         */
+        virtual bool isMessagePullSupported() const {
+            return false;
+        }
+
+        /**
+         * Pulls a message from the the service provider that this Connector is
+         * associated with. This could be because the service has a prefetch
+         * policy that is set to zero and therefor requires each message to
+         * be pulled from the server to the client via a poll.
+         * @param info - the consumer info for the consumer to pull for
+         * @param timeout - the time that the caller is going to wait for new messages
+         * @throw ConnectorException if a communications error occurs
+         * @throw UnsupportedOperationException if the connector can't pull
+         */
+        virtual void pullMessage( connector::ConsumerInfo* info, long long timeout )
+            throw ( ConnectorException, decaf::lang::exceptions::UnsupportedOperationException
);
+
     public: // transport::CommandListener
 
         /**



Mime
View raw message