qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kgiu...@apache.org
Subject svn commit: r1133535 - in /qpid/branches/qpid-3079/qpid/cpp/src: qpid/broker/ tests/
Date Wed, 08 Jun 2011 19:55:03 GMT
Author: kgiusti
Date: Wed Jun  8 19:55:02 2011
New Revision: 1133535

URL: http://svn.apache.org/viewvc?rev=1133535&view=rev
Log:
QPID-3079: fixes that allow persistence unit tests to pass

Modified:
    qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/PersistableQueue.h
    qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/Queue.h
    qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/RecoverableQueue.h
    qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
    qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/SemanticState.cpp
    qpid/branches/qpid-3079/qpid/cpp/src/tests/AsyncCompletion.cpp

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/PersistableQueue.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/PersistableQueue.h?rev=1133535&r1=1133534&r2=1133535&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/PersistableQueue.h (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/PersistableQueue.h Wed Jun  8 19:55:02
2011
@@ -56,24 +56,20 @@ public:
     typedef boost::shared_ptr<PersistableQueue> shared_ptr;
 
     virtual const std::string& getName() const = 0;
-    virtual ~PersistableQueue() {
-        if (externalQueueStore) 
-             delete externalQueueStore;
-    };
+    virtual ~PersistableQueue() {};
 
-    virtual void setExternalQueueStore(ExternalQueueStore* inst) = 0;
+    virtual void setExternalQueueStore(const boost::shared_ptr<ExternalQueueStore>&
inst) = 0;
     virtual void flush() = 0;
     
-    inline ExternalQueueStore* getExternalQueueStore() const {return externalQueueStore;};
+    inline boost::shared_ptr<ExternalQueueStore> getExternalQueueStore() const {return
externalQueueStore;};
     
-    PersistableQueue():externalQueueStore(NULL){
-    };
+    PersistableQueue() {};
 
     /** the message has finished being dequeued from the store */
     virtual void dequeueComplete(const boost::intrusive_ptr<PersistableMessage>&)
= 0;
     
 protected:
-    ExternalQueueStore* externalQueueStore;
+    boost::shared_ptr<ExternalQueueStore> externalQueueStore;
     
 };
 

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1133535&r1=1133534&r2=1133535&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/Queue.cpp Wed Jun  8 19:55:02 2011
@@ -684,6 +684,7 @@ bool Queue::dequeue(TransactionContext* 
                 Mutex::ScopedLock locker(messageLock);
                 pendingDequeueCallbacks[pmsg.get()] = callback;
             }
+            QPID_LOG(debug, "Message " << pmsg << " async dequeue started on
queue " << name);
             store->dequeue(ctxt, pmsg, shared_from_this());  // invokes Queue::dequeueComplete()
when done
             return true;
         }
@@ -1041,9 +1042,8 @@ bool Queue::hasExclusiveConsumer() const
     return exclusive; 
 }
 
-void Queue::setExternalQueueStore(ExternalQueueStore* inst) {
-    if (externalQueueStore!=inst && externalQueueStore) 
-        delete externalQueueStore; 
+void Queue::setExternalQueueStore(const boost::shared_ptr<ExternalQueueStore>&
inst)
+{
     externalQueueStore = inst;
 
     if (inst) {
@@ -1215,6 +1215,10 @@ const Broker* Queue::getBroker()
  * message has completed. */
 void Queue::dequeueComplete(const boost::intrusive_ptr<PersistableMessage>& msg
)
 {
+    ScopedUse u(barrier);
+    if (!u.acquired) return;
+
+    QPID_LOG(debug, "Message " << msg << " dequeue complete on queue " <<
name);
     msg->dequeueComplete(shared_from_this(), store);
 
     boost::shared_ptr<DequeueDoneCallback> cb;
@@ -1228,7 +1232,6 @@ void Queue::dequeueComplete(const boost:
         }
     }
     if (cb) (*cb)();
-    QPID_LOG(error, "dequeueComplete:=" << cb);
 }
 
 

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/Queue.h?rev=1133535&r1=1133534&r2=1133535&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/Queue.h Wed Jun  8 19:55:02 2011
@@ -351,7 +351,7 @@ class Queue : public boost::enable_share
     static Queue::shared_ptr restore(QueueRegistry& queues, framing::Buffer& buffer);
     static void tryAutoDelete(Broker& broker, Queue::shared_ptr);
 
-    virtual void setExternalQueueStore(ExternalQueueStore* inst);
+    virtual void setExternalQueueStore(const boost::shared_ptr<ExternalQueueStore>&
inst);
 
     // Manageable entry points
     management::ManagementObject* GetManagementObject (void) const;

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/RecoverableQueue.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/RecoverableQueue.h?rev=1133535&r1=1133534&r2=1133535&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/RecoverableQueue.h (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/RecoverableQueue.h Wed Jun  8 19:55:02
2011
@@ -48,8 +48,8 @@ public:
     virtual ~RecoverableQueue() {};
 
     virtual const std::string& getName() const = 0;
-    virtual void setExternalQueueStore(ExternalQueueStore* inst) = 0;
-	virtual ExternalQueueStore* getExternalQueueStore() const = 0;
+    virtual void setExternalQueueStore(const boost::shared_ptr<ExternalQueueStore>&
inst) = 0;
+	virtual boost::shared_ptr<ExternalQueueStore> getExternalQueueStore() const = 0;
 
 };
 

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp?rev=1133535&r1=1133534&r2=1133535&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp Wed Jun  8 19:55:02
2011
@@ -65,8 +65,8 @@ public:
     void setPersistenceId(uint64_t id);    
 	uint64_t getPersistenceId() const;
     const std::string& getName() const;
-    void setExternalQueueStore(ExternalQueueStore* inst);
-    ExternalQueueStore* getExternalQueueStore() const;
+    void setExternalQueueStore(const boost::shared_ptr<ExternalQueueStore>& inst);
+    boost::shared_ptr<ExternalQueueStore> getExternalQueueStore() const;
     void recover(RecoverableMessage::shared_ptr msg);
     void enqueue(DtxBuffer::shared_ptr buffer, RecoverableMessage::shared_ptr msg);
     void dequeue(DtxBuffer::shared_ptr buffer, RecoverableMessage::shared_ptr msg);
@@ -213,12 +213,12 @@ const std::string& RecoverableQueueImpl:
     return queue->getName();
 }
     
-void RecoverableQueueImpl::setExternalQueueStore(ExternalQueueStore* inst)
+void RecoverableQueueImpl::setExternalQueueStore(const boost::shared_ptr<ExternalQueueStore>&
inst)
 {
     queue->setExternalQueueStore(inst);
 }
 
-ExternalQueueStore* RecoverableQueueImpl::getExternalQueueStore() const
+boost::shared_ptr<ExternalQueueStore> RecoverableQueueImpl::getExternalQueueStore()
const
 {
 	return queue->getExternalQueueStore();
 }

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1133535&r1=1133534&r2=1133535&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/SemanticState.cpp Wed Jun  8 19:55:02
2011
@@ -763,8 +763,8 @@ isInSequenceSetAnd(const SequenceSet& s,
 
 /** Design notes for asychronous completion of Message.accept **
  * Message.Accept command cannot be considered "complete" until all messages
- * identified by the sequence set that accompanys the command have been
- * completely dequeued.  A message dequeue may not be able to complet
+ * identified by the sequence set that accompanies the command have been
+ * completely dequeued.  A message dequeue operation may not be able to complete
  * synchronously - specifically when the message is durable.  Therefore, the
  * Message.Accept command handling must be able to track asynchronous dequeues,
  * and notify the Session when all dequeues are done (at which point the
@@ -772,12 +772,14 @@ isInSequenceSetAnd(const SequenceSet& s,
  */
 namespace {
 
-    /** Manage a Message.accept that requires async completion of one or more
-     * message dequeue operations */
+    /** Manage a Message.accept command that requires async completion of one
+     * or more message dequeue operations.  An instance is registered with the
+     * SessionContext for each asynchronous Message.accept that is "in
+     * flight" */
     class AsyncMessageAcceptCmd : public SessionContext::AsyncCommandContext
     {
         mutable qpid::sys::Mutex lock;
-        std::set<DeliveryId> pending;    // for dequeue to complete
+        std::map<DeliveryId, boost::shared_ptr<Queue> > pending;    // for dequeue
to complete
         bool ready;
         SemanticState& state;
 
@@ -788,28 +790,30 @@ namespace {
         /** called from session to urge pending dequeues to complete ASAP */
         void flush()
         {
-            std::set<DeliveryId> copy;
+            QPID_LOG(trace, "Flushing pending message.accept cmd=" << getId());
+            std::map<DeliveryId, boost::shared_ptr<Queue> > copy;
             {
                 Mutex::ScopedLock l(lock);
                 copy = pending;
             }
-            for (std::set<DeliveryId>::iterator i = copy.begin();
+            std::set<Queue *> flushedQs;    // flush each queue only once!
+            for (std::map<DeliveryId, boost::shared_ptr<Queue> >::iterator i
= copy.begin();
                  i != copy.end(); ++i) {
-                for (DeliveryRecords::iterator r = state.getUnacked().begin();
-                     r != state.getUnacked().end(); ++r) {
-                    if (r->getId() == *i) {
-                        r->getQueue()->flush();
-                        break;
-                    }
+                Queue *queue(i->second.get());
+                if (flushedQs.find(queue) == flushedQs.end()) {
+                    flushedQs.insert(queue);
+                    i->second->flush();
                 }
             }
         }
 
         /** add a pending dequeue to track */
-        void add( const DeliveryId& id )
+        void add( const DeliveryId& id, const boost::shared_ptr<Queue>& queue
)
         {
+            QPID_LOG(trace, "Scheduling dequeue of delivery " << id
+                     << " on session " << state.getSession().getSessionId());
             Mutex::ScopedLock l(lock);
-            bool unique = pending.insert(id).second;
+            bool unique = pending.insert(std::pair<DeliveryId, boost::shared_ptr<Queue>
>(id, queue)).second;
             if (!unique) {
                 assert(false);
             }
@@ -818,22 +822,27 @@ namespace {
         /** signal this dequeue done. Note: may be run in *any* thread */
         void complete( const DeliveryId& id )
         {
+            QPID_LOG(trace, "Dequeue of delivery " << id
+                     << " completed on session " << state.getSession().getSessionId());
             Mutex::ScopedLock l(lock);
-            std::set<DeliveryId>::iterator i = pending.find(id);
+            std::map<DeliveryId, boost::shared_ptr<Queue> >::iterator i = pending.find(id);
             assert(i != pending.end());
             pending.erase(i);
 
             if (ready && pending.empty()) {
                 framing::Invoker::Result r;   // message.accept does not return result data
                 Mutex::ScopedUnlock ul(lock);
+                QPID_LOG(trace, "Completing async message.accept cmd=" << getId());
                 completed( r );
             }
         }
 
         /** allow the Message.Accept to complete - do this only after all
-         * deliveryIds have been added() */
+         * deliveryIds have been added() and this has been registered with the
+         * SessionContext */
         void enable()
         {
+            QPID_LOG(trace, "Dispatching async message.accept cmd=" << getId());
             Mutex::ScopedLock l(lock);
             if (pending.empty()) {
                 framing::Invoker::Result r;
@@ -846,9 +855,10 @@ namespace {
     };
 
 
-    /** callback to indicate a single message has completed its dequeue.  This
-        object is made available to the queue, which will invoke it when the
-        dequeue completes. */
+    /** callback to indicate a single message has completed its asynchronous
+        dequeue.  This object is made available to the queue when a dequeue is
+        started.  The queue will invoke the callback when the dequeue
+        completes. */
     class DequeueDone : public Queue::DequeueDoneCallback
     {
         DeliveryId id;
@@ -862,28 +872,31 @@ namespace {
 
 
     /** factory to create the above callback - passed to queue's dequeue
-        method, only used if dequeue is asynchronous! */
+        method, only invoked if the dequeue operation is asynchronous! */
     boost::shared_ptr<Queue::DequeueDoneCallback> factory( SemanticState *state,
                                                            const DeliveryId& id,
-                                                           boost::intrusive_ptr<AsyncMessageAcceptCmd>&
cmd )
+                                                           const boost::shared_ptr<Queue>&
queue,
+                                                           boost::intrusive_ptr<AsyncMessageAcceptCmd>*
cmd )
     {
-        if (!cmd) {     // first async dequeue creates the context
-            cmd.reset(new AsyncMessageAcceptCmd(*state));
+        if (!cmd->get()) {     // first async dequeue creates the context
+            cmd->reset(new AsyncMessageAcceptCmd(*state));
         }
-        cmd->add( id );
-        boost::shared_ptr<DequeueDone> x( new DequeueDone(id, cmd ) );
+        (*cmd)->add( id, queue );
+        boost::shared_ptr<DequeueDone> x( new DequeueDone(id, *cmd ) );
         return x;
     }
 
-    /** predicate to process unacked delivery records */
+    /** predicate to process unacked delivery records during Message.accept
+        processing */
     bool acceptDelivery( SemanticState *state,
-                         boost::intrusive_ptr<AsyncMessageAcceptCmd>& cmd,
+                         boost::intrusive_ptr<AsyncMessageAcceptCmd>* cmd,
                          DeliveryRecord& dr )
     {
-        Queue::DequeueDoneCallbackFactory f = boost::bind(factory, state, dr.getId(), cmd);
+        Queue::DequeueDoneCallbackFactory f = boost::bind(factory, state, dr.getId(), dr.getQueue(),
cmd);
         return dr.accept((TransactionContext*) 0, &f);
     }
-}
+}   // namespace
+
 
 void SemanticState::accepted(const SequenceSet& commands) {
     assertClusterSafe();
@@ -921,7 +934,7 @@ void SemanticState::accepted(const Seque
                     isInSequenceSetAnd(commands,
                                        bind(acceptDelivery,
                                             this,
-                                            cmd,
+                                            &cmd,
                                             _1)));
         unacked.erase(removed, unacked.end());
 

Modified: qpid/branches/qpid-3079/qpid/cpp/src/tests/AsyncCompletion.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/tests/AsyncCompletion.cpp?rev=1133535&r1=1133534&r2=1133535&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/tests/AsyncCompletion.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/tests/AsyncCompletion.cpp Wed Jun  8 19:55:02 2011
@@ -56,6 +56,10 @@ class AsyncCompletionMessageStore : publ
   public:
     sys::BlockingQueue<boost::intrusive_ptr<PersistableMessage> > enqueued;
 
+    typedef std::pair<boost::intrusive_ptr<PersistableMessage>,
+                      boost::shared_ptr<PersistableQueue> > DequeueRecord;
+    sys::BlockingQueue<DequeueRecord> dequeued;
+
     AsyncCompletionMessageStore() : NullMessageStore() {}
     ~AsyncCompletionMessageStore(){}
 
@@ -65,6 +69,13 @@ class AsyncCompletionMessageStore : publ
     {
         enqueued.push(msg);
     }
+
+    void dequeue(TransactionContext*,
+                 const boost::intrusive_ptr<PersistableMessage>& msg,
+                 const boost::shared_ptr<PersistableQueue>& queue)
+    {
+        dequeued.push(DequeueRecord(msg, queue));
+    }
 };
 
 QPID_AUTO_TEST_SUITE(AsyncCompletionTestSuite)
@@ -102,6 +113,44 @@ QPID_AUTO_TEST_CASE(testWaitTillComplete
         enqueued[k]->enqueueComplete();
     }
     sync.wait();                // Should complete now, all messages are completed.
+
+    // now test the dequeue: messageAccept should not complete until all pending
+    // dequeues complete
+    SubscriptionSettings settings;
+    settings.acceptMode = ACCEPT_MODE_EXPLICIT;
+    settings.autoAck = 0;
+    settings.completionMode = COMPLETE_ON_ACCEPT;
+
+    LocalQueue q;
+    Subscription sub = fix.subs.subscribe(q, "q", settings);
+    s.messageFlush(arg::destination=sub.getName());
+    SequenceSet accepted;
+    for (int x = 0; x < count; x++) {
+        Message m;
+        BOOST_CHECK(q.get(m, TIME_SEC * 3));
+        accepted.add(m.getId());
+    }
+
+    Completion accept = s.messageAccept(accepted, arg::sync=true);
+    sync = s.executionSync(arg::sync=true);
+
+    std::vector<AsyncCompletionMessageStore::DequeueRecord> dequeued;
+    for (int y = 0; y < count; y++) {
+        dequeued.push_back(store->dequeued.pop(TIME_SEC * 3));
+    }
+
+    sleep( 1 );   // even with this, accept should NOT complete!
+
+    for (int z = count-1; z >= 0; --z) {
+        BOOST_CHECK(!accept.isComplete()); // Should not be complete yet.
+        BOOST_CHECK(!sync.isComplete()); // Should not be complete yet.
+        // now complete the dequeue of the message:
+        dequeued[z].second->dequeueComplete(dequeued[z].first);
+    }
+
+    // now both should complete.
+    accept.wait();
+    sync.wait();
 }
 
 QPID_AUTO_TEST_CASE(testGetResult) {



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message