qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r1244067 - in /qpid/branches/qpid-3603-6/qpid/cpp/src: qpid/broker/ qpid/cluster/ qpid/ha/ tests/
Date Tue, 14 Feb 2012 16:06:33 GMT
Author: aconway
Date: Tue Feb 14 16:06:33 2012
New Revision: 1244067

URL: http://svn.apache.org/viewvc?rev=1244067&view=rev
Log:
QPID-3603: Handle backup crash/shutdown.

If a backup crashes or shuts down any messages that have
been delayed completion for that backup must be marked
complete to avoid the primary hanging.

Modified:
    qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/Consumer.h
    qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
    qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/DeliveryRecord.h
    qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/SemanticState.cpp
    qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/SemanticState.h
    qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/cluster/Connection.cpp
    qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/HaBroker.cpp
    qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
    qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
    qpid/branches/qpid-3603-6/qpid/cpp/src/tests/DeliveryRecordTest.cpp
    qpid/branches/qpid-3603-6/qpid/cpp/src/tests/QueueTest.cpp

Modified: qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/Consumer.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/Consumer.h?rev=1244067&r1=1244066&r2=1244067&view=diff
==============================================================================
--- qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/Consumer.h (original)
+++ qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/Consumer.h Tue Feb 14 16:06:33 2012
@@ -34,10 +34,12 @@ class QueueListeners;
 /**
  * Base class for consumers which represent a subscription to a queue.
  */
-class Consumer {
+class Consumer
+{
     const bool acquires;
-    // inListeners allows QueueListeners to efficiently track if this instance is registered
-    // for notifications without having to search its containers
+    // inListeners allows QueueListeners to efficiently track if this
+    // instance is registered for notifications without having to
+    // search its containers
     bool inListeners;
     // the name is generated by broker and is unique within broker scope.  It is not
     // provided or known by the remote Consumer.
@@ -61,7 +63,12 @@ class Consumer {
     virtual bool accept(boost::intrusive_ptr<Message>) { return true; }
     virtual OwnershipToken* getSession() = 0;
     virtual void cancel() = 0;
-    virtual bool isDelayedCompletion() const { return false; }
+
+    /** Called when the peer has acknowledged receipt of the message.
+     * Not to be confused with accept() above, which is asking if
+     * this consumer will consume/browse the message.
+     */
+    virtual void acknowledged(const QueuedMessage&) = 0;
 
   protected:
     framing::SequenceNumber position;

Modified: qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=1244067&r1=1244066&r2=1244067&view=diff
==============================================================================
--- qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Tue Feb 14 16:06:33
2012
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,6 +21,7 @@
 #include "qpid/broker/DeliveryRecord.h"
 #include "qpid/broker/DeliverableMessage.h"
 #include "qpid/broker/SemanticState.h"
+#include "qpid/broker/Consumer.h"
 #include "qpid/broker/Exchange.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/log/Statement.h"
@@ -31,23 +32,25 @@ using namespace qpid;
 using namespace qpid::broker;
 using std::string;
 
-DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg, 
-                               const Queue::shared_ptr& _queue, 
+DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg,
+                               const Queue::shared_ptr& _queue,
                                const std::string& _tag,
+                               const boost::shared_ptr<Consumer>& _consumer,
                                bool _acquired,
-                               bool accepted, 
+                               bool accepted,
                                bool _windowing,
-                               uint32_t _credit, bool _isDelayedCompletion) : msg(_msg),
-                                                  queue(_queue), 
-                                                  tag(_tag),
-                                                  acquired(_acquired),
-                                                  acceptExpected(!accepted),
-                                                  cancelled(false),
-                                                  completed(false),
-                                                  ended(accepted && acquired),
-                                                  windowing(_windowing),
-                                                  credit(msg.payload ? msg.payload->getRequiredCredit()
: _credit),
-                                                  isDelayedCompletion(_isDelayedCompletion)
+                               uint32_t _credit):
+    msg(_msg),
+    queue(_queue),
+    tag(_tag),
+    consumer(_consumer),
+    acquired(_acquired),
+    acceptExpected(!accepted),
+    cancelled(false),
+    completed(false),
+    ended(accepted && acquired),
+    windowing(_windowing),
+    credit(msg.payload ? msg.payload->getRequiredCredit() : _credit)
 {}
 
 bool DeliveryRecord::setEnded()
@@ -95,7 +98,7 @@ void DeliveryRecord::requeue() const
     }
 }
 
-void DeliveryRecord::release(bool setRedelivered) 
+void DeliveryRecord::release(bool setRedelivered)
 {
     if (acquired && !ended) {
         if (setRedelivered) msg.payload->redeliver();
@@ -108,19 +111,13 @@ void DeliveryRecord::release(bool setRed
 }
 
 void DeliveryRecord::complete()  {
-    completed = true; 
+    completed = true;
 }
 
 bool DeliveryRecord::accept(TransactionContext* ctxt) {
     if (!ended) {
-        if (acquired) {
-            queue->dequeue(ctxt, msg);
-        } else if (isDelayedCompletion) {
-            // FIXME aconway 2011-12-05: This should be done in HA code.
-            msg.payload->getIngressCompletion().finishCompleter();
-            QPID_LOG(debug, "Completed " << msg.queue->getName()
-                     << "[" << msg.position << "]");
-        }
+        consumer->acknowledged(getMessage());
+        if (acquired) queue->dequeue(ctxt, msg);
         setEnded();
         QPID_LOG(debug, "Accepted " << id);
     }
@@ -137,8 +134,8 @@ void DeliveryRecord::committed() const{
     queue->dequeueCommitted(msg);
 }
 
-void DeliveryRecord::reject() 
-{    
+void DeliveryRecord::reject()
+{
     if (acquired && !ended) {
         Exchange::shared_ptr alternate = queue->getAlternateExchange();
         if (alternate) {
@@ -174,7 +171,7 @@ void DeliveryRecord::acquire(DeliveryIds
     }
 }
 
-void DeliveryRecord::cancel(const std::string& cancelledTag) 
+void DeliveryRecord::cancel(const std::string& cancelledTag)
 {
     if (tag == cancelledTag)
         cancelled = true;
@@ -193,7 +190,7 @@ AckRange DeliveryRecord::findRange(Deliv
 namespace qpid {
 namespace broker {
 
-std::ostream& operator<<(std::ostream& out, const DeliveryRecord& r) 
+std::ostream& operator<<(std::ostream& out, const DeliveryRecord& r)
 {
     out << "{" << "id=" << r.id.getValue();
     out << ", tag=" << r.tag << "}";

Modified: qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/DeliveryRecord.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/DeliveryRecord.h?rev=1244067&r1=1244066&r2=1244067&view=diff
==============================================================================
--- qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/DeliveryRecord.h (original)
+++ qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/DeliveryRecord.h Tue Feb 14 16:06:33
2012
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -38,6 +38,7 @@ namespace broker {
 class TransactionContext;
 class SemanticState;
 struct AckRange;
+class Consumer;
 
 /**
  * Record of a delivery for which an ack is outstanding.
@@ -47,6 +48,7 @@ class DeliveryRecord
     QueuedMessage msg;
     mutable boost::shared_ptr<Queue> queue;
     std::string tag;    // name of consumer
+    boost::shared_ptr<Consumer> consumer;
     DeliveryId id;
     bool acquired : 1;
     bool acceptExpected : 1;
@@ -63,21 +65,20 @@ class DeliveryRecord
      * after that).
      */
     uint32_t credit;
-    bool isDelayedCompletion;
 
   public:
     QPID_BROKER_EXTERN DeliveryRecord(const QueuedMessage& msg,
                                       const boost::shared_ptr<Queue>& queue, 
                                       const std::string& tag,
+                                      const boost::shared_ptr<Consumer>& consumer,
                                       bool acquired,
                                       bool accepted,
                                       bool windowing,
-                                      uint32_t credit=0,       // Only used if msg is empty.
-                                      bool isDelayedCompletion=false
+                                      uint32_t credit=0 // Only used if msg is empty.
     );
-    
+
     bool coveredBy(const framing::SequenceSet* const range) const { return range->contains(id);
}
-    
+
     void dequeue(TransactionContext* ctxt = 0) const;
     void requeue() const;
     void release(bool setRedelivered);
@@ -97,7 +98,7 @@ class DeliveryRecord
     bool isAccepted() const { return !acceptExpected; }
     bool isEnded() const { return ended; }
     bool isWindowing() const { return windowing; }
-    
+
     uint32_t getCredit() const;
     const std::string& getTag() const { return tag; }
 
@@ -134,7 +135,7 @@ typedef DeliveryRecord::DeliveryRecords 
 struct AckRange
 {
     DeliveryRecords::iterator start;
-    DeliveryRecords::iterator end;    
+    DeliveryRecords::iterator end;
     AckRange(DeliveryRecords::iterator _start, DeliveryRecords::iterator _end) : start(_start),
end(_end) {}
 };
 

Modified: qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1244067&r1=1244066&r2=1244067&view=diff
==============================================================================
--- qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/SemanticState.cpp Tue Feb 14 16:06:33
2012
@@ -341,7 +341,8 @@ bool SemanticState::ConsumerImpl::delive
 {
     assertClusterSafe();
     allocateCredit(msg.payload);
-    DeliveryRecord record(msg, msg.queue->shared_from_this(), getTag(), acquire, !ackExpected,
credit.isWindowMode(), 0, isDelayedCompletion());
+    DeliveryRecord record(msg, msg.queue->shared_from_this(), getTag(),
+                          shared_from_this(), acquire, !ackExpected, credit.isWindowMode(),
0);
     bool sync = syncFrequency && ++deliveryCount >= syncFrequency;
     if (sync) deliveryCount = 0;//reset
     parent->deliver(record, sync);
@@ -364,7 +365,7 @@ bool SemanticState::ConsumerImpl::filter
 bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg)
 {
     assertClusterSafe();
-    // FIXME aconway 2009-06-08: if we have byte & message credit but
+    // TODO aconway 2009-06-08: if we have byte & message credit but
     // checkCredit fails because the message is to big, we should
     // remain on queue's listener list for possible smaller messages
     // in future.

Modified: qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/SemanticState.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/SemanticState.h?rev=1244067&r1=1244066&r2=1244067&view=diff
==============================================================================
--- qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/broker/SemanticState.h Tue Feb 14 16:06:33
2012
@@ -149,14 +149,11 @@ class SemanticState : private boost::non
         SemanticState& getParent() { return *parent; }
         const SemanticState& getParent() const { return *parent; }
 
-        // Manageable entry points
+        void acknowledged(const broker::QueuedMessage&) {}
+
+        // manageable entry points
         management::ManagementObject* GetManagementObject (void) const;
         management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args&
args, std::string& text);
-
-        /** This consumer wants delayed completion.
-         * Overridden by ConsumerImpl subclasses.
-         */
-        virtual bool isDelayedCompletion() const { return false; }
     };
 
     typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap;

Modified: qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=1244067&r1=1244066&r2=1244067&view=diff
==============================================================================
--- qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/cluster/Connection.cpp Tue Feb 14 16:06:33
2012
@@ -549,7 +549,7 @@ void Connection::deliveryRecord(const st
         } else {                // Message at original position in original queue
             queue->find(position, m);
         }
-        // FIXME aconway 2011-08-19: removed:
+        // NOTE: removed:
         // if (!m.payload)
         //      throw Exception(QPID_MSG("deliveryRecord no update message"));
         //
@@ -561,7 +561,8 @@ void Connection::deliveryRecord(const st
         //
     }
 
-    broker::DeliveryRecord dr(m, queue, tag, acquired, accepted, windowing, credit);
+    broker::DeliveryRecord dr(m, queue, tag, semanticState().find(tag),
+                              acquired, accepted, windowing, credit);
     dr.setId(id);
     if (cancelled) dr.cancel(dr.getTag());
     if (completed) dr.complete();

Modified: qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/HaBroker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1244067&r1=1244066&r2=1244067&view=diff
==============================================================================
--- qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/HaBroker.cpp Tue Feb 14 16:06:33 2012
@@ -70,6 +70,7 @@ HaBroker::HaBroker(broker::Broker& b, co
         boost::shared_ptr<ReplicatingSubscription::Factory>(
             new ReplicatingSubscription::Factory()));
 }
+
 HaBroker::~HaBroker() {}
 
 Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, string&)
{

Modified: qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1244067&r1=1244066&r2=1244067&view=diff
==============================================================================
--- qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp (original)
+++ qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp Tue Feb 14
16:06:33 2012
@@ -132,20 +132,68 @@ bool ReplicatingSubscription::deliver(Qu
 
 ReplicatingSubscription::~ReplicatingSubscription() {}
 
+
+// INVARIANT: delayed contains msg <=> we have outstanding startCompletion on msg
+
+// Mark a message completed. May be called by acknowledge or dequeued
+void ReplicatingSubscription::complete(
+    const QueuedMessage& qm, const sys::Mutex::ScopedLock&)
+{
+    // Handle completions for the subscribed queue, not the internal event queue.
+    if (qm.queue && qm.queue == getQueue().get()) {
+        QPID_LOG(trace, logPrefix << "Completed message " << qm.position);
+        Delayed::iterator i= delayed.find(qm.position);
+        // The same message can be completed twice, by acknowledged and
+        // dequeued, remove it from the set so it only gets completed
+        // once.
+        if (i != delayed.end()) {
+            assert(i->second.payload == qm.payload);
+            qm.payload->getIngressCompletion().finishCompleter();
+            delayed.erase(i);
+        }
+    }
+}
+
+// Called before we get notified of the message being available and
+// under the message lock in the queue. Called in arbitrary connection thread.
+void ReplicatingSubscription::enqueued(const QueuedMessage& qm) {
+    sys::Mutex::ScopedLock l(lock);
+    // Delay completion
+    QPID_LOG(trace, logPrefix << "Delaying completion of message " << qm.position);
+    qm.payload->getIngressCompletion().startCompleter();
+    assert(delayed.find(qm.position) == delayed.end());
+    delayed[qm.position] = qm;
+}
+
+
+// Function to complete a delayed message, called by cancel()
+void ReplicatingSubscription::cancelComplete(
+    const Delayed::value_type& v, const sys::Mutex::ScopedLock&)
+{
+    QPID_LOG(trace, logPrefix << "Cancel completed message " << v.second.position);
+    v.second.payload->getIngressCompletion().finishCompleter();
+}
+
 // Called in the subscription's connection thread.
 void ReplicatingSubscription::cancel()
 {
-    QPID_LOG(debug, logPrefix <<"Cancelled backup subscription " << getName());
     getQueue()->removeObserver(
         boost::dynamic_pointer_cast<QueueObserver>(shared_from_this()));
+    {
+        sys::Mutex::ScopedLock l(lock);
+        QPID_LOG(debug, logPrefix <<"Cancelled backup subscription " << getName());
+        for_each(delayed.begin(), delayed.end(),
+                 boost::bind(&ReplicatingSubscription::cancelComplete, this, _1, boost::ref(l)));
+        delayed.clear();
+    }
     ConsumerImpl::cancel();
 }
 
-// Called before we get notified of the message being available and
-// under the message lock in the queue. Called in arbitrary connection thread.
-void ReplicatingSubscription::enqueued(const QueuedMessage& m) {
-    //delay completion
-    m.payload->getIngressCompletion().startCompleter();
+// Called on primary in the backups IO thread.
+void ReplicatingSubscription::acknowledged(const QueuedMessage& msg) {
+    sys::Mutex::ScopedLock l(lock);
+    // Finish completion of message, it has been acknowledged by the backup.
+    complete(msg, l);
 }
 
 // Called with lock held. Called in subscription's connection thread.
@@ -160,6 +208,21 @@ void ReplicatingSubscription::sendDequeu
     sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer, l);
 }
 
+// Called after the message has been removed from the deque and under
+// the messageLock in the queue. Called in arbitrary connection threads.
+void ReplicatingSubscription::dequeued(const QueuedMessage& qm)
+{
+    {
+        sys::Mutex::ScopedLock l(lock);
+        QPID_LOG(trace, logPrefix << "Dequeued message " << qm.position);
+        dequeues.add(qm.position);
+        // If we have not yet sent this message to the backup, then
+        // complete it now as it will never be accepted.
+        if (qm.position > position) complete(qm, l);
+    }
+    notify();                   // Ensure a call to doDispatch
+}
+
 // Called with lock held. Called in subscription's connection thread.
 void ReplicatingSubscription::sendPositionEvent(
     SequenceNumber position, const sys::Mutex::ScopedLock&l )
@@ -205,28 +268,6 @@ void ReplicatingSubscription::sendEvent(
     events->dispatch(consumer);
 }
 
-// Called after the message has been removed from the deque and under
-// the messageLock in the queue. Called in arbitrary connection threads.
-void ReplicatingSubscription::dequeued(const QueuedMessage& m)
-{
-    {
-        sys::Mutex::ScopedLock l(lock);
-        dequeues.add(m.position);
-        // If we have not yet sent this message to the backup, then
-        // complete it now as it will never be accepted.
-
-        // FIXME aconway 2012-01-05: suspect use of position in
-        // foreign connection thread.  Race with deliver() which is
-        // not under the message lock?
-        if (m.position > position) {
-            m.payload->getIngressCompletion().finishCompleter();
-            QPID_LOG(trace, logPrefix << "Dequeued and completed message " <<
m.position << " early");
-        }
-        else
-            QPID_LOG(trace, logPrefix << "Dequeued message " << m.position);
-    }
-    notify();                   // Ensure a call to doDispatch
-}
 
 // Called in subscription's connection thread.
 bool ReplicatingSubscription::doDispatch()
@@ -244,7 +285,6 @@ bool ReplicatingSubscription::Delegating
 void ReplicatingSubscription::DelegatingConsumer::notify() { delegate.notify(); }
 bool ReplicatingSubscription::DelegatingConsumer::filter(boost::intrusive_ptr<Message>
msg) { return delegate.filter(msg); }
 bool ReplicatingSubscription::DelegatingConsumer::accept(boost::intrusive_ptr<Message>
msg) { return delegate.accept(msg); }
-void ReplicatingSubscription::DelegatingConsumer::cancel() {}
 OwnershipToken* ReplicatingSubscription::DelegatingConsumer::getSession() { return delegate.getSession();
}
 
 }} // namespace qpid::ha

Modified: qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h?rev=1244067&r1=1244066&r2=1244067&view=diff
==============================================================================
--- qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h (original)
+++ qpid/branches/qpid-3603-6/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h Tue Feb 14 16:06:33
2012
@@ -85,17 +85,21 @@ class ReplicatingSubscription : public b
 
     // Consumer overrides.
     void cancel();
-    bool isDelayedCompletion() const { return true; }
+    void acknowledged(const broker::QueuedMessage&);
 
   protected:
     bool doDispatch();
   private:
+    typedef std::map<framing::SequenceNumber, broker::QueuedMessage> Delayed;
     std::string logPrefix;
     boost::shared_ptr<broker::Queue> events;
     boost::shared_ptr<broker::Consumer> consumer;
-    qpid::framing::SequenceSet dequeues;
+    Delayed delayed;
+    framing::SequenceSet dequeues;
     framing::SequenceNumber backupPosition;
 
+    void complete(const broker::QueuedMessage&, const sys::Mutex::ScopedLock&);
+    void cancelComplete(const Delayed::value_type& v, const sys::Mutex::ScopedLock&);
     void sendDequeueEvent(const sys::Mutex::ScopedLock&);
     void sendPositionEvent(framing::SequenceNumber, const sys::Mutex::ScopedLock&);
     void sendEvent(const std::string& key, framing::Buffer&,
@@ -110,9 +114,11 @@ class ReplicatingSubscription : public b
         void notify();
         bool filter(boost::intrusive_ptr<broker::Message>);
         bool accept(boost::intrusive_ptr<broker::Message>);
-        void cancel();
-        bool isDelayedCompletion() const { return false; }
+        void cancel() {}
+        void acknowledged(const broker::QueuedMessage&) {}
+
         broker::OwnershipToken* getSession();
+
       private:
         ReplicatingSubscription& delegate;
     };

Modified: qpid/branches/qpid-3603-6/qpid/cpp/src/tests/DeliveryRecordTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-6/qpid/cpp/src/tests/DeliveryRecordTest.cpp?rev=1244067&r1=1244066&r2=1244067&view=diff
==============================================================================
--- qpid/branches/qpid-3603-6/qpid/cpp/src/tests/DeliveryRecordTest.cpp (original)
+++ qpid/branches/qpid-3603-6/qpid/cpp/src/tests/DeliveryRecordTest.cpp Tue Feb 14 16:06:33
2012
@@ -49,7 +49,7 @@ QPID_AUTO_TEST_CASE(testSort)
 
     list<DeliveryRecord> records;
     for (list<SequenceNumber>::iterator i = ids.begin(); i != ids.end(); i++) {
-        DeliveryRecord r(QueuedMessage(0), Queue::shared_ptr(), "tag", false, false, false);
+        DeliveryRecord r(QueuedMessage(0), Queue::shared_ptr(), "tag", Consumer::shared_ptr(),
false, false, false);
         r.setId(*i);
         records.push_back(r);
     }

Modified: qpid/branches/qpid-3603-6/qpid/cpp/src/tests/QueueTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-6/qpid/cpp/src/tests/QueueTest.cpp?rev=1244067&r1=1244066&r2=1244067&view=diff
==============================================================================
--- qpid/branches/qpid-3603-6/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ qpid/branches/qpid-3603-6/qpid/cpp/src/tests/QueueTest.cpp Tue Feb 14 16:06:33 2012
@@ -67,6 +67,7 @@ public:
     };
     void notify() {}
     void cancel() {}
+    void acknowledged(const QueuedMessage&) {}
     OwnershipToken* getSession() { return 0; }
 };
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message