qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cctriel...@apache.org
Subject svn commit: r587332 - in /incubator/qpid/trunk/qpid/cpp/src: Makefile.am qpid/broker/IncomingExecutionContext.cpp qpid/broker/PersistableMessage.h qpid/broker/PersistableQueue.h qpid/broker/Queue.cpp
Date Tue, 23 Oct 2007 00:29:33 GMT
Author: cctrieloff
Date: Mon Oct 22 17:29:32 2007
New Revision: 587332

URL: http://svn.apache.org/viewvc?rev=587332&view=rev
Log:
- flush async IO if present on sync for 0-10
- notify, for ack from sync for 0-10
- use of raw pointer, to avoid recursive fre


Modified:
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncomingExecutionContext.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableQueue.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=587332&r1=587331&r2=587332&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Mon Oct 22 17:29:32 2007
@@ -144,6 +144,7 @@
   qpid/broker/BrokerSingleton.cpp \
   qpid/broker/Exchange.cpp \
   qpid/broker/Queue.cpp \
+  qpid/broker/PersistableMessage.cpp \
   qpid/broker/Connection.cpp \
   qpid/broker/ConnectionHandler.cpp \
   qpid/broker/ConnectionFactory.cpp \
@@ -199,6 +200,7 @@
   qpid/client/Connection.cpp		\
   qpid/client/Channel.cpp			\
   qpid/client/Exchange.cpp		\
+  qpid/broker/PersistableMessage.cpp \
   qpid/client/Queue.cpp			\
   qpid/client/ConnectionImpl.cpp		\
   qpid/client/Connector.cpp			\

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncomingExecutionContext.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncomingExecutionContext.cpp?rev=587332&r1=587331&r2=587332&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncomingExecutionContext.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncomingExecutionContext.cpp Mon Oct 22
17:29:32 2007
@@ -112,6 +112,10 @@
 void IncomingExecutionContext::wait()
 {
     check();
+	// for IO flush on the store
+    for (Messages::iterator i = incomplete.begin(); i != incomplete.end(); i++) {
+        (*i)->flush();
+    }
     incomplete.front()->waitForEnqueueComplete();
     flush();
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h?rev=587332&r1=587331&r2=587332&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h Mon Oct 22 17:29:32
2007
@@ -23,14 +23,18 @@
  */
 
 #include <string>
+#include <list>
 #include <boost/shared_ptr.hpp>
 #include "Persistable.h"
 #include "qpid/framing/amqp_types.h"
 #include "qpid/sys/Monitor.h"
+#include "PersistableQueue.h"
 
 namespace qpid {
 namespace broker {
 
+class MessageStore;
+
 /**
  * The interface messages must expose to the MessageStore in order to
  * be persistable.
@@ -39,7 +43,8 @@
 {
     sys::Monitor asyncEnqueueLock;
     sys::Monitor asyncDequeueLock;
-
+	sys::Mutex storeLock;
+	
     /**
      * Tracks the number of outstanding asynchronous enqueue
      * operations. When the message is enqueued asynchronously the
@@ -57,7 +62,10 @@
      * dequeues.
      */
     int asyncDequeueCounter;
-
+protected:
+    typedef std::list<PersistableQueue*> syncList;
+	syncList synclist;
+	MessageStore* store;
 public:
     typedef boost::shared_ptr<PersistableMessage> shared_ptr;
 
@@ -70,8 +78,11 @@
 
     PersistableMessage(): 
     	asyncEnqueueCounter(0), 
-    	asyncDequeueCounter(0) 
+    	asyncDequeueCounter(0),
+        store(0) 
 	{}
+
+    void flush();
     
     inline void waitForEnqueueComplete() {
         sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock);
@@ -94,6 +105,15 @@
         }
     }
 
+    inline void enqueueAsync(PersistableQueue* queue, MessageStore* _store) { 
+		if (_store){
+			sys::ScopedLock<sys::Mutex> l(storeLock);
+		    store = _store;
+		    synclist.push_back(queue);
+		}
+	    enqueueAsync();
+	}
+
     inline void enqueueAsync() { 
         sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock);
         asyncEnqueueCounter++; 
@@ -105,6 +125,7 @@
     }
     
     inline void dequeueComplete() { 
+
         sys::ScopedLock<sys::Monitor> l(asyncDequeueLock);
         if (asyncDequeueCounter > 0) {
             if (--asyncDequeueCounter == 0) {
@@ -119,6 +140,15 @@
             asyncDequeueLock.wait();
         }
     }
+
+    inline void dequeueAsync(PersistableQueue* queue, MessageStore* _store) { 
+		if (_store){
+            sys::ScopedLock<sys::Mutex> l(storeLock);
+            store = _store;
+			synclist.push_back(queue);
+		}
+	    dequeueAsync();
+	}
 
     inline void dequeueAsync() { 
         sys::ScopedLock<sys::Monitor> l(asyncDequeueLock);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableQueue.h?rev=587332&r1=587331&r2=587332&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableQueue.h Mon Oct 22 17:29:32 2007
@@ -24,6 +24,7 @@
 
 #include <string>
 #include "Persistable.h"
+#include <boost/shared_ptr.hpp>
 
 namespace qpid {
 namespace broker {
@@ -49,16 +50,17 @@
 class PersistableQueue : public Persistable
 {
 public:
+    typedef boost::shared_ptr<PersistableQueue> shared_ptr;
 
     virtual const std::string& getName() const = 0;
     virtual ~PersistableQueue() {
         if (externalQueueStore) 
-	   delete externalQueueStore;
+             delete externalQueueStore;
     };
 
     inline void setExternalQueueStore(ExternalQueueStore* inst){
         if (externalQueueStore!=inst && externalQueueStore) 
-	   delete externalQueueStore; 
+             delete externalQueueStore; 
         externalQueueStore = inst;
     };
     

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=587332&r1=587331&r2=587332&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Mon Oct 22 17:29:32 2007
@@ -347,7 +347,8 @@
 bool Queue::enqueue(TransactionContext* ctxt, Message::shared_ptr msg)
 {
     if (msg->isPersistent() && store) {
-	msg->enqueueAsync(); //increment to async counter -- for message sent to more than one
queue
+std::cout << "--------------  enqueue ------------" << std::endl << std::flush;
+        msg->enqueueAsync(this, store); //increment to async counter -- for message sent
to more than one queue
         store->enqueue(ctxt, *msg.get(), *this);
 	return true;
     }
@@ -358,7 +359,7 @@
 bool Queue::dequeue(TransactionContext* ctxt, Message::shared_ptr msg)
 {
     if (msg->isPersistent() && store) {
-	msg->dequeueAsync(); //increment to async counter -- for message sent to more than one
queue
+        msg->dequeueAsync(this, store); //increment to async counter -- for message sent
to more than one queue
         store->dequeue(ctxt, *msg.get(), *this);
 	return true;
     }



Mime
View raw message