activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1005483 - in /activemq/activemq-cpp/trunk/activemq-c/src/main/c: CMS_MessageConsumer.cpp CMS_MessageConsumer.h
Date Thu, 07 Oct 2010 15:05:44 GMT
Author: tabish
Date: Thu Oct  7 15:05:44 2010
New Revision: 1005483

URL: http://svn.apache.org/viewvc?rev=1005483&view=rev
Log:
Mostly complete impl for the sync Message Consumer.

Modified:
    activemq/activemq-cpp/trunk/activemq-c/src/main/c/CMS_MessageConsumer.cpp
    activemq/activemq-cpp/trunk/activemq-c/src/main/c/CMS_MessageConsumer.h

Modified: activemq/activemq-cpp/trunk/activemq-c/src/main/c/CMS_MessageConsumer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-c/src/main/c/CMS_MessageConsumer.cpp?rev=1005483&r1=1005482&r2=1005483&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-c/src/main/c/CMS_MessageConsumer.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-c/src/main/c/CMS_MessageConsumer.cpp Thu Oct  7 15:05:44
2010
@@ -17,3 +17,214 @@
 
 #include <CMS_MessageConsumer.h>
 
+#include <Config.h>
+#include <types/CMS_Types.h>
+
+#ifdef HAVE_STDLIB_H
+#include <stdlib.h>
+#endif
+
+#include <memory>
+
+////////////////////////////////////////////////////////////////////////////////
+cms_status createDefaultConsumer(CMS_Session* session, CMS_Destination* destination, CMS_MessageConsumer**
consumer) {
+
+    cms_status result = CMS_SUCCESS;
+    std::auto_ptr<CMS_MessageConsumer> wrapper( new CMS_MessageConsumer );
+
+    try{
+
+        if (session == NULL || destination == NULL) {
+            result = CMS_ERROR;
+        } else {
+            wrapper->consumer = session->session->createConsumer(destination->destination);
+        }
+
+        *consumer = wrapper.release();
+    } catch(...) {
+        result = CMS_ERROR;
+    }
+
+    return result;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms_status createConsumer(CMS_Session* session, CMS_Destination* destination, CMS_MessageConsumer**
consumer, const char* selector, int noLocal) {
+
+    cms_status result = CMS_SUCCESS;
+    std::auto_ptr<CMS_MessageConsumer> wrapper( new CMS_MessageConsumer );
+
+    try{
+
+        if (session == NULL || destination == NULL) {
+            result = CMS_ERROR;
+        } else {
+            wrapper->consumer = session->session->createConsumer(
+                destination->destination, selector, noLocal > 0 ? true : false);
+        }
+
+        *consumer = wrapper.release();
+    } catch(...) {
+        result = CMS_ERROR;
+    }
+
+    return result;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms_status createDurableConsumer(CMS_Session* session,
+                                 CMS_Destination* destination,
+                                 CMS_MessageConsumer** consumer,
+                                 const char* subscriptionName,
+                                 const char* selector,
+                                 int noLocal) {
+
+    cms_status result = CMS_SUCCESS;
+    std::auto_ptr<CMS_MessageConsumer> wrapper( new CMS_MessageConsumer );
+
+    try{
+
+        if (session == NULL || destination == NULL) {
+            result = CMS_ERROR;
+        } else {
+
+            if (destination->type != CMS_TOPIC) {
+                result = CMS_INVALID_DESTINATION;
+            } else {
+
+                cms::Topic* topic = dynamic_cast<cms::Topic*>(destination->destination);
+
+                wrapper->consumer = session->session->createDurableConsumer(
+                    topic, subscriptionName, selector, noLocal > 0 ? true : false);
+
+                *consumer = wrapper.release();
+            }
+        }
+
+    } catch(...) {
+        result = CMS_ERROR;
+    }
+
+    return result;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms_status consumerReceive(CMS_MessageConsumer* consumer, CMS_Message** message) {
+
+    cms_status result = CMS_SUCCESS;
+
+    if(consumer != NULL) {
+
+        try{
+
+            std::auto_ptr<CMS_Message> wrapper( new CMS_Message );
+
+            cms::Message* msg = consumer->consumer->receive();
+
+            wrapper->message = msg;
+            // TODO set the message type
+
+            *message = wrapper.release();
+
+        } catch(...) {
+            result = CMS_ERROR;
+        }
+    }
+
+    return result;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms_status consumerReceiveWithTimeout(CMS_MessageConsumer* consumer, CMS_Message** message,
int timeout) {
+
+    cms_status result = CMS_SUCCESS;
+
+    if(consumer != NULL) {
+
+        try{
+
+            std::auto_ptr<CMS_Message> wrapper( new CMS_Message );
+
+            cms::Message* msg = consumer->consumer->receive(timeout);
+
+            if (msg != NULL) {
+                wrapper->message = msg;
+                // TODO set the message type
+                *message = wrapper.release();
+            } else {
+                *message = NULL;
+            }
+
+        } catch(...) {
+            result = CMS_ERROR;
+        }
+    }
+
+    return result;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms_status consumerReceiveNoWait(CMS_MessageConsumer* consumer, CMS_Message** message) {
+
+    cms_status result = CMS_SUCCESS;
+
+    if(consumer != NULL) {
+
+        try{
+
+            std::auto_ptr<CMS_Message> wrapper( new CMS_Message );
+
+            cms::Message* msg = consumer->consumer->receive();
+
+            if (msg != NULL) {
+                wrapper->message = msg;
+
+                // TODO set the message type
+
+                *message = wrapper.release();
+            } else {
+                *message = NULL;
+            }
+
+        } catch(...) {
+            result = CMS_ERROR;
+        }
+    }
+
+    return result;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms_status closeConsumer(CMS_MessageConsumer* consumer) {
+
+    cms_status result = CMS_SUCCESS;
+
+    if(consumer != NULL) {
+
+        try{
+            consumer->consumer->close();
+        } catch(...) {
+            result = CMS_ERROR;
+        }
+    }
+
+    return result;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms_status destroyConsumer(CMS_MessageConsumer* consumer) {
+
+    cms_status result = CMS_SUCCESS;
+
+    if(consumer != NULL) {
+
+        try{
+            delete consumer->consumer;
+            delete consumer;
+        } catch(...) {
+            result = CMS_ERROR;
+        }
+    }
+
+    return result;
+}

Modified: activemq/activemq-cpp/trunk/activemq-c/src/main/c/CMS_MessageConsumer.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-c/src/main/c/CMS_MessageConsumer.h?rev=1005483&r1=1005482&r2=1005483&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-c/src/main/c/CMS_MessageConsumer.h (original)
+++ activemq/activemq-cpp/trunk/activemq-c/src/main/c/CMS_MessageConsumer.h Thu Oct  7 15:05:44
2010
@@ -24,6 +24,131 @@
 extern "C" {
 #endif
 
+/**
+ * Given a Session instance, create a new Consumer with the default settings.
+ *
+ * @param session
+ *      The Session that is to be used to create the new Consumer.
+ * @param destination
+ *      The Destination that this consumer will subscribe to.
+ * @param consumer
+ *      The memory location where the newly allocated Consumer instance is to be stored.
+ *
+ * @return result code indicating the success or failure of the operation.
+ */
+cms_status createDefaultConsumer(CMS_Session* session, CMS_Destination* destination, CMS_MessageConsumer**
consumer);
+
+/**
+ * Given a Session instance, create a new Consumer with the specified Consumer
+ * ack mode.
+ *
+ * @param session
+ *      The Session that is to be used to create the new Consumer.
+ * @param destination
+ *      The Destination that this consumer will subscribe to.
+ * @param consumer
+ *      The memory location where the newly allocated Consumer instance is to be stored.
+ * @param selector
+ *      The selector used to filter Messages on this Consumer, can be NULL.
+ * @param noLocal
+ *      Boolean indicating whether locally produced Messages are delivered to this Consumer.
+ *
+ * @return result code indicating the success or failure of the operation.
+ */
+cms_status createConsumer(CMS_Session* session,
+                          CMS_Destination* destination,
+                          CMS_MessageConsumer** consumer,
+                          const char* selector,
+                          int noLocal);
+
+/**
+ * Given a Session instance, create a new Consumer with the specified Consumer
+ * ack mode.
+ *
+ * @param session
+ *      The Session that is to be used to create the new Consumer.
+ * @param destination
+ *      The Destination that this consumer will subscribe to.
+ * @param consumer
+ *      The memory location where the newly allocated Consumer instance is to be stored.
+ * @param subscriptionName
+ *      The name to assign the durable subscription.
+ * @param selector
+ *      The selector used to filter Messages on this Consumer, can be NULL.
+ * @param noLocal
+ *      Boolean indicating whether locally produced Messages are delivered to this Consumer.
+ *
+ * @return result code indicating the success or failure of the operation.
+ */
+cms_status createDurableConsumer(CMS_Session* session,
+                                 CMS_Destination* destination,
+                                 CMS_MessageConsumer** consumer,
+                                 const char* subscriptionName,
+                                 const char* selector,
+                                 int noLocal);
+
+/**
+ * Waits for a Message to become available and stores it in the Location passed to this
+ * method.  The client will block indefinitely waiting for a Message, to interrupt the
+ * client call another thread must call the closeConsumer method.
+ *
+ * @param consumer
+ *      The MessageConsumer that will be used to receive the Message.
+ * @param message
+ *      The Memory location where a newly received message is to be stored.
+ *
+ * @return result code indicating the success or failure of the operation.
+ */
+cms_status consumerReceive(CMS_MessageConsumer* consumer, CMS_Message** message);
+
+/**
+ * Waits for a Message to become available and stores it in the Location passed to this
+ * method.  The client will block until the timeout elapses waiting for a Message, to interrupt
+ * the client call another thread must call the closeConsumer method.
+ *
+ * @param consumer
+ *      The MessageConsumer that will be used to receive the Message.
+ * @param message
+ *      The Memory location where a newly received message is to be stored.
+ * @param timeout
+ *      The time in milliseconds to wait for a Message to arrive.
+ *
+ * @return result code indicating the success or failure of the operation.
+ */
+cms_status consumerReceiveWithTimeout(CMS_MessageConsumer* consumer, CMS_Message** message,
int timeout);
+
+/**
+ * Checks for a Message that's available and stores it in the Location passed to this
+ * method.  If a message is not immediately available then this method returns and the
+ * message store location is set to null.
+ *
+ * @param consumer
+ *      The MessageConsumer that will be used to receive the Message.
+ * @param message
+ *      The Memory location where a newly received message is to be stored.
+ *
+ * @return result code indicating the success or failure of the operation.
+ */
+cms_status consumerReceiveNoWait(CMS_MessageConsumer* consumer, CMS_Message** message);
+
+/**
+ * Closes the MessageConsumer, interrupting any currently blocked receive calls.
+ *
+ * @param consumer
+ *      The Consumer that is to be destroyed.
+ */
+cms_status closeConsumer(CMS_MessageConsumer* consumer);
+
+/**
+ * Destroys the given Consumer instance.
+ *
+ * @param consumer
+ *      The Consumer that is to be destroyed.
+ *
+ * @return result code indicating the success or failure of the operation.
+ */
+cms_status destroyConsumer(CMS_MessageConsumer* consumer);
+
 #ifdef __cplusplus
 }
 #endif



Mime
View raw message