qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cctriel...@apache.org
Subject svn commit: r566289 - in /incubator/qpid/trunk/qpid/cpp/src/qpid/broker: BrokerQueue.cpp BrokerQueue.h MessageStore.h MessageStoreModule.cpp MessageStoreModule.h NullMessageStore.cpp NullMessageStore.h PersistableMessage.h PersistableQueue.h TxPublish.cpp
Date Wed, 15 Aug 2007 18:13:03 GMT
Author: cctrieloff
Date: Wed Aug 15 11:13:02 2007
New Revision: 566289

URL: http://svn.apache.org/viewvc?view=rev&rev=566289
Log:

async IO for broker store


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableQueue.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp?view=diff&rev=566289&r1=566288&r2=566289
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp Wed Aug 15 11:13:02 2007
@@ -56,6 +56,14 @@
 
 Queue::~Queue(){}
 
+void Queue::notifyDurableIOComplete()
+{
+    // signal SemanticHander to ack completed dequeues
+    // then dispatch to ack...
+    serializer.execute(dispatchCallback);
+}
+
+
 void Queue::deliver(Message::shared_ptr& msg){
     if (msg->isImmediate() && getConsumerCount() == 0) {
         if (alternateExchange) {
@@ -63,11 +71,20 @@
             alternateExchange->route(deliverable, msg->getRoutingKey(), &(msg->getApplicationHeaders()));
         }
     } else {
-        enqueue(0, msg);
-        process(msg);
+
+
+	// if no store then mark as enqueued
+        if (!enqueue(0, msg)){
+            push(msg);
+	    msg->enqueueComplete();
+	}else {
+            push(msg);
+	}
+        serializer.execute(dispatchCallback);
     }
 }
 
+
 void Queue::recover(Message::shared_ptr& msg){
     push(msg);
     if (store && msg->expectedContentSize() != msg->encodedContentSize()) {
@@ -127,6 +144,7 @@
 
 void Queue::dispatch(){
 
+
      Message::shared_ptr msg;
      while(true){
         {
@@ -134,7 +152,7 @@
 	    if (messages.empty()) break; 
 	    msg = messages.front();
 	}
-        if( dispatch(msg) ){
+        if( msg->isEnqueueComplete() && dispatch(msg) ){
             pop();
         }else break;
 	
@@ -215,19 +233,26 @@
     return autodelete && consumers.size() == 0;
 }
 
-void Queue::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg)
+// return true if store exists, 
+bool Queue::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg)
 {
     if (msg->isPersistent() && store) {
         store->enqueue(ctxt, *msg.get(), *this);
+	return true;
     }
+    return false;
 }
 
-void Queue::dequeue(TransactionContext* ctxt, Message::shared_ptr& msg)
+// return true if store exists, 
+bool Queue::dequeue(TransactionContext* ctxt, Message::shared_ptr& msg)
 {
     if (msg->isPersistent() && store) {
         store->dequeue(ctxt, *msg.get(), *this);
+	return true;
     }
+    return false;
 }
+
 
 namespace 
 {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h?view=diff&rev=566289&r1=566288&r2=566289
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h Wed Aug 15 11:13:02 2007
@@ -87,6 +87,12 @@
 	     */
             void dispatch();
  
+        protected:
+	   /**
+	   * Call back from store
+	   */
+  	   virtual void notifyDurableIOComplete();
+
         public:
             
             typedef boost::shared_ptr<Queue> shared_ptr;
@@ -143,11 +149,11 @@
 
             bool canAutoDelete() const;
 
-            void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg);
+            bool enqueue(TransactionContext* ctxt, Message::shared_ptr& msg);
             /**
              * dequeue from store (only done once messages is acknowledged)
              */
-            void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg);
+            bool dequeue(TransactionContext* ctxt, Message::shared_ptr& msg);
             /**
              * dequeues from memory only
              */

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h?view=diff&rev=566289&r1=566288&r2=566289
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h Wed Aug 15 11:13:02 2007
@@ -104,7 +104,10 @@
     /**
      * Enqueues a message, storing the message if it has not
      * been previously stored and recording that the given
-     * message is on the given queue.
+     * message is on the given queue. 
+     *
+     * Note: that this is async so the return of the function does
+     * not mean the opperation is complete.
      * 
      * @param msg the message to enqueue
      * @param queue the name of the queue onto which it is to be enqueued
@@ -113,18 +116,34 @@
      * place or null for 'local' transactions
      */
     virtual void enqueue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue&
queue) = 0;
+    
     /**
      * Dequeues a message, recording that the given message is
      * no longer on the given queue and deleting the message
      * if it is no longer on any other queue.
+     *
+     * Note: that this is async so the return of the function does
+     * not mean the opperation is complete.
      * 
      * @param msg the message to dequeue
-     * @param queue the name of th queue from which it is to be dequeued
+     * @param queue the name of the queue from which it is to be dequeued
      * @param xid (a pointer to) an identifier of the
      * distributed transaction in which the operation takes
      * place or null for 'local' transactions
      */
     virtual void dequeue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue&
queue) = 0;
+
+
+   /**
+     * Returns the number of outstanding AIO's for a given queue
+     * 
+     * If 0, than all the enqueue / dequeues have been stored 
+     * to disk
+     *
+     * @param queue the name of the queue to check for outstanding AIO
+     */
+    virtual u_int32_t outstandingQueueAIO(const PersistableQueue& queue) = 0;
+
     
     virtual ~MessageStore(){}
 };

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp?view=diff&rev=566289&r1=566288&r2=566289
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp Wed Aug 15 11:13:02
2007
@@ -95,6 +95,11 @@
     store->dequeue(ctxt, msg, queue);
 }
 
+u_int32_t MessageStoreModule::outstandingQueueAIO(const PersistableQueue& queue)
+{
+    return store->outstandingQueueAIO(queue);
+}
+
 std::auto_ptr<TransactionContext> MessageStoreModule::begin()
 {
     return store->begin();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h?view=diff&rev=566289&r1=566288&r2=566289
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h Wed Aug 15 11:13:02
2007
@@ -59,8 +59,10 @@
     void destroy(PersistableMessage& msg);
     void appendContent(PersistableMessage& msg, const std::string& data);
     void loadContent(PersistableMessage& msg, std::string& data, uint64_t offset,
uint32_t length);
+
     void enqueue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue&
queue);
     void dequeue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue&
queue);
+    u_int32_t outstandingQueueAIO(const PersistableQueue& queue);
 
     ~MessageStoreModule(){}
 };

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp?view=diff&rev=566289&r1=566288&r2=566289
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp Wed Aug 15 11:13:02
2007
@@ -97,14 +97,21 @@
     QPID_LOG(info, "Can't load content. Persistence not enabled.");
 }
 
-void NullMessageStore::enqueue(TransactionContext*, PersistableMessage&, const PersistableQueue&
queue)
+void NullMessageStore::enqueue(TransactionContext*, PersistableMessage& msg, const PersistableQueue&
queue)
 {
+    msg.enqueueComplete(); 
     QPID_LOG(info, "Can't enqueue message onto '" << queue.getName() << "'. Persistence
not enabled.");
 }
 
-void NullMessageStore::dequeue(TransactionContext*, PersistableMessage&, const PersistableQueue&
queue)
+void NullMessageStore::dequeue(TransactionContext*, PersistableMessage& msg, const PersistableQueue&
queue)
 {
+    msg.dequeueComplete();
     QPID_LOG(info, "Can't dequeue message from '" << queue.getName() << "'. Persistence
not enabled.");
+}
+
+u_int32_t NullMessageStore::outstandingQueueAIO(const PersistableQueue& )
+{
+    return 0;
 }
 
 std::auto_ptr<TransactionContext> NullMessageStore::begin()

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h?view=diff&rev=566289&r1=566288&r2=566289
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h Wed Aug 15 11:13:02 2007
@@ -62,6 +62,7 @@
     virtual void loadContent(PersistableMessage& msg, std::string& data, uint64_t
offset, uint32_t length);
     virtual void enqueue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue&
queue);
     virtual void dequeue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue&
queue);
+    virtual u_int32_t outstandingQueueAIO(const PersistableQueue& queue);
     ~NullMessageStore(){}
 };
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h?view=diff&rev=566289&r1=566288&r2=566289
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h Wed Aug 15 11:13:02
2007
@@ -36,6 +36,23 @@
  */
     class PersistableMessage : public Persistable
 {
+
+
+    /**
+    * Needs to be set false on Message construction, then
+    * set once the broker has taken responsibility for the
+    * message. For transient, once enqueued, for durable, once
+    * stored.
+    */
+    bool enqueueCompleted;
+    /**
+    * Needs to be set false on Message construction, then
+    * set once the dequeueis complete, it gets set
+    * For transient, once dequeued, for durable, once
+    * dequeue record has been stored.
+    */
+    bool dequeueCompleted;
+
 public:
     typedef boost::shared_ptr<PersistableMessage> shared_ptr;
 
@@ -45,6 +62,15 @@
     virtual uint32_t encodedHeaderSize() const = 0;
 
     virtual ~PersistableMessage() {};
+    PersistableMessage():
+    enqueueCompleted(false),
+    dequeueCompleted(false){};
+    
+    inline bool isEnqueueComplete() {return enqueueCompleted;};
+    inline void enqueueComplete() {enqueueCompleted = true;};
+    inline bool isDequeueComplete() {return dequeueCompleted;};
+    inline void dequeueComplete() {dequeueCompleted = true;};
+    
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableQueue.h?view=diff&rev=566289&r1=566288&r2=566289
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableQueue.h Wed Aug 15 11:13:02 2007
@@ -35,8 +35,20 @@
 class PersistableQueue : public Persistable
 {
 public:
+
     virtual const std::string& getName() const = 0;
     virtual ~PersistableQueue() {};
+    
+protected:
+    /**
+    * call back for the store to signal AIO writes have
+    * completed (enqueue/dequeue etc)
+    *
+    * Note: DO NOT do work on this callback, if you block
+    * this callback you will block the store.
+    */
+    virtual void notifyDurableIOComplete()  = 0;
+    
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp?view=diff&rev=566289&r1=566288&r2=566289
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp Wed Aug 15 11:13:02 2007
@@ -51,7 +51,14 @@
     : ctxt(_ctxt), msg(_msg){}
 
 void TxPublish::Prepare::operator()(Queue::shared_ptr& queue){
-    queue->enqueue(ctxt, msg);
+    if (!queue->enqueue(ctxt, msg)){
+        /**
+	* if not store then mark message for ack and deleivery once
+	* commit happens, as async IO will never set it when no store
+	* exists
+	*/
+	msg->enqueueComplete();
+    }
 }
 
 TxPublish::Commit::Commit(Message::shared_ptr& _msg) : msg(_msg){}



Mime
View raw message