Author: tabish
Date: Thu Oct 7 15:05:44 2010
New Revision: 1005483
URL: http://svn.apache.org/viewvc?rev=1005483&view=rev
Log:
Mostly complete impl for the sync Message Consumer.
Modified:
activemq/activemq-cpp/trunk/activemq-c/src/main/c/CMS_MessageConsumer.cpp
activemq/activemq-cpp/trunk/activemq-c/src/main/c/CMS_MessageConsumer.h
Modified: activemq/activemq-cpp/trunk/activemq-c/src/main/c/CMS_MessageConsumer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-c/src/main/c/CMS_MessageConsumer.cpp?rev=1005483&r1=1005482&r2=1005483&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-c/src/main/c/CMS_MessageConsumer.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-c/src/main/c/CMS_MessageConsumer.cpp Thu Oct 7 15:05:44
2010
@@ -17,3 +17,214 @@
#include <CMS_MessageConsumer.h>
+#include <Config.h>
+#include <types/CMS_Types.h>
+
+#ifdef HAVE_STDLIB_H
+#include <stdlib.h>
+#endif
+
+#include <memory>
+
+////////////////////////////////////////////////////////////////////////////////
+cms_status createDefaultConsumer(CMS_Session* session, CMS_Destination* destination, CMS_MessageConsumer**
consumer) {
+
+ cms_status result = CMS_SUCCESS;
+ std::auto_ptr<CMS_MessageConsumer> wrapper( new CMS_MessageConsumer );
+
+ try{
+
+ if (session == NULL || destination == NULL) {
+ result = CMS_ERROR;
+ } else {
+ wrapper->consumer = session->session->createConsumer(destination->destination);
+ }
+
+ *consumer = wrapper.release();
+ } catch(...) {
+ result = CMS_ERROR;
+ }
+
+ return result;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms_status createConsumer(CMS_Session* session, CMS_Destination* destination, CMS_MessageConsumer**
consumer, const char* selector, int noLocal) {
+
+ cms_status result = CMS_SUCCESS;
+ std::auto_ptr<CMS_MessageConsumer> wrapper( new CMS_MessageConsumer );
+
+ try{
+
+ if (session == NULL || destination == NULL) {
+ result = CMS_ERROR;
+ } else {
+ wrapper->consumer = session->session->createConsumer(
+ destination->destination, selector, noLocal > 0 ? true : false);
+ }
+
+ *consumer = wrapper.release();
+ } catch(...) {
+ result = CMS_ERROR;
+ }
+
+ return result;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms_status createDurableConsumer(CMS_Session* session,
+ CMS_Destination* destination,
+ CMS_MessageConsumer** consumer,
+ const char* subscriptionName,
+ const char* selector,
+ int noLocal) {
+
+ cms_status result = CMS_SUCCESS;
+ std::auto_ptr<CMS_MessageConsumer> wrapper( new CMS_MessageConsumer );
+
+ try{
+
+ if (session == NULL || destination == NULL) {
+ result = CMS_ERROR;
+ } else {
+
+ if (destination->type != CMS_TOPIC) {
+ result = CMS_INVALID_DESTINATION;
+ } else {
+
+ cms::Topic* topic = dynamic_cast<cms::Topic*>(destination->destination);
+
+ wrapper->consumer = session->session->createDurableConsumer(
+ topic, subscriptionName, selector, noLocal > 0 ? true : false);
+
+ *consumer = wrapper.release();
+ }
+ }
+
+ } catch(...) {
+ result = CMS_ERROR;
+ }
+
+ return result;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms_status consumerReceive(CMS_MessageConsumer* consumer, CMS_Message** message) {
+
+ cms_status result = CMS_SUCCESS;
+
+ if(consumer != NULL) {
+
+ try{
+
+ std::auto_ptr<CMS_Message> wrapper( new CMS_Message );
+
+ cms::Message* msg = consumer->consumer->receive();
+
+ wrapper->message = msg;
+ // TODO set the message type
+
+ *message = wrapper.release();
+
+ } catch(...) {
+ result = CMS_ERROR;
+ }
+ }
+
+ return result;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms_status consumerReceiveWithTimeout(CMS_MessageConsumer* consumer, CMS_Message** message,
int timeout) {
+
+ cms_status result = CMS_SUCCESS;
+
+ if(consumer != NULL) {
+
+ try{
+
+ std::auto_ptr<CMS_Message> wrapper( new CMS_Message );
+
+ cms::Message* msg = consumer->consumer->receive(timeout);
+
+ if (msg != NULL) {
+ wrapper->message = msg;
+ // TODO set the message type
+ *message = wrapper.release();
+ } else {
+ *message = NULL;
+ }
+
+ } catch(...) {
+ result = CMS_ERROR;
+ }
+ }
+
+ return result;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms_status consumerReceiveNoWait(CMS_MessageConsumer* consumer, CMS_Message** message) {
+
+ cms_status result = CMS_SUCCESS;
+
+ if(consumer != NULL) {
+
+ try{
+
+ std::auto_ptr<CMS_Message> wrapper( new CMS_Message );
+
+ cms::Message* msg = consumer->consumer->receive();
+
+ if (msg != NULL) {
+ wrapper->message = msg;
+
+ // TODO set the message type
+
+ *message = wrapper.release();
+ } else {
+ *message = NULL;
+ }
+
+ } catch(...) {
+ result = CMS_ERROR;
+ }
+ }
+
+ return result;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms_status closeConsumer(CMS_MessageConsumer* consumer) {
+
+ cms_status result = CMS_SUCCESS;
+
+ if(consumer != NULL) {
+
+ try{
+ consumer->consumer->close();
+ } catch(...) {
+ result = CMS_ERROR;
+ }
+ }
+
+ return result;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms_status destroyConsumer(CMS_MessageConsumer* consumer) {
+
+ cms_status result = CMS_SUCCESS;
+
+ if(consumer != NULL) {
+
+ try{
+ delete consumer->consumer;
+ delete consumer;
+ } catch(...) {
+ result = CMS_ERROR;
+ }
+ }
+
+ return result;
+}
Modified: activemq/activemq-cpp/trunk/activemq-c/src/main/c/CMS_MessageConsumer.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-c/src/main/c/CMS_MessageConsumer.h?rev=1005483&r1=1005482&r2=1005483&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-c/src/main/c/CMS_MessageConsumer.h (original)
+++ activemq/activemq-cpp/trunk/activemq-c/src/main/c/CMS_MessageConsumer.h Thu Oct 7 15:05:44
2010
@@ -24,6 +24,131 @@
extern "C" {
#endif
+/**
+ * Given a Session instance, create a new Consumer with the default settings.
+ *
+ * @param session
+ * The Session that is to be used to create the new Consumer.
+ * @param destination
+ * The Destination that this consumer will subscribe to.
+ * @param consumer
+ * The memory location where the newly allocated Consumer instance is to be stored.
+ *
+ * @return result code indicating the success or failure of the operation.
+ */
+cms_status createDefaultConsumer(CMS_Session* session, CMS_Destination* destination, CMS_MessageConsumer**
consumer);
+
+/**
+ * Given a Session instance, create a new Consumer with the specified Consumer
+ * ack mode.
+ *
+ * @param session
+ * The Session that is to be used to create the new Consumer.
+ * @param destination
+ * The Destination that this consumer will subscribe to.
+ * @param consumer
+ * The memory location where the newly allocated Consumer instance is to be stored.
+ * @param selector
+ * The selector used to filter Messages on this Consumer, can be NULL.
+ * @param noLocal
+ * Boolean indicating whether locally produced Messages are delivered to this Consumer.
+ *
+ * @return result code indicating the success or failure of the operation.
+ */
+cms_status createConsumer(CMS_Session* session,
+ CMS_Destination* destination,
+ CMS_MessageConsumer** consumer,
+ const char* selector,
+ int noLocal);
+
+/**
+ * Given a Session instance, create a new Consumer with the specified Consumer
+ * ack mode.
+ *
+ * @param session
+ * The Session that is to be used to create the new Consumer.
+ * @param destination
+ * The Destination that this consumer will subscribe to.
+ * @param consumer
+ * The memory location where the newly allocated Consumer instance is to be stored.
+ * @param subscriptionName
+ * The name to assign the durable subscription.
+ * @param selector
+ * The selector used to filter Messages on this Consumer, can be NULL.
+ * @param noLocal
+ * Boolean indicating whether locally produced Messages are delivered to this Consumer.
+ *
+ * @return result code indicating the success or failure of the operation.
+ */
+cms_status createDurableConsumer(CMS_Session* session,
+ CMS_Destination* destination,
+ CMS_MessageConsumer** consumer,
+ const char* subscriptionName,
+ const char* selector,
+ int noLocal);
+
+/**
+ * Waits for a Message to become available and stores it in the Location passed to this
+ * method. The client will block indefinitely waiting for a Message, to interrupt the
+ * client call another thread must call the closeConsumer method.
+ *
+ * @param consumer
+ * The MessageConsumer that will be used to receive the Message.
+ * @param message
+ * The Memory location where a newly received message is to be stored.
+ *
+ * @return result code indicating the success or failure of the operation.
+ */
+cms_status consumerReceive(CMS_MessageConsumer* consumer, CMS_Message** message);
+
+/**
+ * Waits for a Message to become available and stores it in the Location passed to this
+ * method. The client will block until the timeout elapses waiting for a Message, to interrupt
+ * the client call another thread must call the closeConsumer method.
+ *
+ * @param consumer
+ * The MessageConsumer that will be used to receive the Message.
+ * @param message
+ * The Memory location where a newly received message is to be stored.
+ * @param timeout
+ * The time in milliseconds to wait for a Message to arrive.
+ *
+ * @return result code indicating the success or failure of the operation.
+ */
+cms_status consumerReceiveWithTimeout(CMS_MessageConsumer* consumer, CMS_Message** message,
int timeout);
+
+/**
+ * Checks for a Message that's available and stores it in the Location passed to this
+ * method. If a message is not immediately available then this method returns and the
+ * message store location is set to null.
+ *
+ * @param consumer
+ * The MessageConsumer that will be used to receive the Message.
+ * @param message
+ * The Memory location where a newly received message is to be stored.
+ *
+ * @return result code indicating the success or failure of the operation.
+ */
+cms_status consumerReceiveNoWait(CMS_MessageConsumer* consumer, CMS_Message** message);
+
+/**
+ * Closes the MessageConsumer, interrupting any currently blocked receive calls.
+ *
+ * @param consumer
+ * The Consumer that is to be destroyed.
+ */
+cms_status closeConsumer(CMS_MessageConsumer* consumer);
+
+/**
+ * Destroys the given Consumer instance.
+ *
+ * @param consumer
+ * The Consumer that is to be destroyed.
+ *
+ * @return result code indicating the success or failure of the operation.
+ */
+cms_status destroyConsumer(CMS_MessageConsumer* consumer);
+
#ifdef __cplusplus
}
#endif
|