qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kgiu...@apache.org
Subject svn commit: r1154376 - in /qpid/branches/qpid-3346/qpid/cpp/src: qpid/broker/ qpid/cluster/ tests/
Date Fri, 05 Aug 2011 20:47:11 GMT
Author: kgiusti
Date: Fri Aug  5 20:47:10 2011
New Revision: 1154376

URL: http://svn.apache.org/viewvc?rev=1154376&view=rev
Log:
QPID-3346: refactor queue interface to support consumer-based message selection.

Modified:
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Consumer.h
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DeliveryRecord.h
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.h
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/QueueEvents.cpp
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/QueueObserver.h
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/SemanticState.cpp
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/SemanticState.h
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/ThresholdAlerts.h
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/cluster/Connection.cpp
    qpid/branches/qpid-3346/qpid/cpp/src/tests/QueueTest.cpp

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Consumer.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Consumer.h?rev=1154376&r1=1154375&r2=1154376&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Consumer.h (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Consumer.h Fri Aug  5 20:47:10 2011
@@ -36,13 +36,17 @@ class Consumer {
     // inListeners allows QueueListeners to efficiently track if this instance is registered
     // for notifications without having to search its containers
     bool inListeners;
-  public:
-    typedef boost::shared_ptr<Consumer> shared_ptr;            
-    
+    const std::string name;
+ public:
+    typedef boost::shared_ptr<Consumer> shared_ptr;
+
     framing::SequenceNumber position;
-    
-    Consumer(bool preAcquires = true) : acquires(preAcquires), inListeners(false) {}
+
+    Consumer(const std::string& _name, bool preAcquires = true)
+      : acquires(preAcquires), inListeners(false), name(_name), position(0) {}
     bool preAcquires() const { return acquires; }
+    const std::string& getName() const { return name; }
+
     virtual bool deliver(QueuedMessage& msg) = 0;
     virtual void notify() = 0;
     virtual bool filter(boost::intrusive_ptr<Message>) { return true; }

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=1154376&r1=1154375&r2=1154376&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Fri Aug  5 20:47:10 2011
@@ -142,7 +142,7 @@ void DeliveryRecord::reject() 
             //just drop it
             QPID_LOG(info, "Dropping rejected message from " << queue->getName());
         }
-        dequeue();
+        queue->dequeue(0, msg);
         setEnded();
     }
 }
@@ -152,8 +152,14 @@ uint32_t DeliveryRecord::getCredit() con
     return credit;
 }
 
-void DeliveryRecord::acquire(DeliveryIds& results) {
-    if (queue->acquire(msg)) {
+void DeliveryRecord::acquire(SemanticState* const session, DeliveryIds& results) {
+    SemanticState::ConsumerImpl::shared_ptr consumer;
+
+    if (!session->find( tag, consumer )) {
+        QPID_LOG(error, "Can't acquire message " << id.getValue() << ": original subscription no longer exists.");
+    }
+
+    if (queue->acquire(msg, consumer)) {
         acquired = true;
         results.push_back(id);
         if (!acceptExpected) {

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DeliveryRecord.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DeliveryRecord.h?rev=1154376&r1=1154375&r2=1154376&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DeliveryRecord.h (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DeliveryRecord.h Fri Aug  5 20:47:10 2011
@@ -46,7 +46,7 @@ class DeliveryRecord
 {
     QueuedMessage msg;
     mutable boost::shared_ptr<Queue> queue;
-    std::string tag;
+    std::string tag;    // name of consumer
     DeliveryId id;
     bool acquired : 1;
     bool acceptExpected : 1;
@@ -82,7 +82,7 @@ class DeliveryRecord
     void reject();
     void cancel(const std::string& tag);
     void redeliver(SemanticState* const);
-    void acquire(DeliveryIds& results);
+    void acquire(SemanticState* const, DeliveryIds& results);
     void complete();
     bool accept(TransactionContext* ctxt); // Returns isRedundant()
     bool setEnded();            // Returns isRedundant()
@@ -90,7 +90,7 @@ class DeliveryRecord
 
     bool isAcquired() const { return acquired; }
     bool isComplete() const { return completed; }
-    bool isRedundant() const { return ended && (!windowing || completed); }
+    bool isRedundant() const { return ended && (!windowing || completed); }     // msg no longer needed - can discard
     bool isCancelled() const { return cancelled; }
     bool isAccepted() const { return !acceptExpected; }
     bool isEnded() const { return ended; }
@@ -117,13 +117,14 @@ inline bool operator<(const DeliveryReco
 
 struct AcquireFunctor
 {
+    SemanticState* session;
     DeliveryIds& results;
 
-    AcquireFunctor(DeliveryIds& _results) : results(_results) {}
+    AcquireFunctor(SemanticState* _session, DeliveryIds& _results) : session(_session), results(_results) {}
 
     void operator()(DeliveryRecord& record)
     {
-        record.acquire(results);
+        record.acquire(session, results);
     }
 };
 

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp?rev=1154376&r1=1154375&r2=1154376&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp Fri Aug  5 20:47:10 2011
@@ -35,7 +35,9 @@ void LegacyLVQ::setNoBrowse(bool b)
 bool LegacyLVQ::remove(const framing::SequenceNumber& position, QueuedMessage& message)
 {
     Ordering::iterator i = messages.find(position);
-    if (i != messages.end() && i->second.payload == message.payload) {
+    if (i != messages.end() &&
+        // @todo KAG: gsim? is a bug? message is a *return* value - we really shouldn't check ".payload" below:
+        i->second.payload == message.payload) {
         message = i->second;
         erase(i);
         return true;

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1154376&r1=1154375&r2=1154376&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.cpp Fri Aug  5 20:47:10 2011
@@ -87,6 +87,28 @@ const int ENQUEUE_ONLY=1;
 const int ENQUEUE_AND_DEQUEUE=2;
 }
 
+
+// KAG TBD: find me a home....
+namespace qpid {
+namespace broker {
+
+class MessageSelector
+{
+ protected:
+    Queue *queue;
+ public:
+    MessageSelector( Queue *q ) : queue(q) {}
+    virtual ~MessageSelector() {};
+
+    // assumes caller holds messageLock
+    virtual bool nextMessage( Consumer::shared_ptr c, QueuedMessage& next,
+                              const Mutex::ScopedLock&);
+    virtual bool canAcquire(Consumer::shared_ptr consumer, const QueuedMessage& qm,
+                            const Mutex::ScopedLock&);
+};
+
+}}
+
 Queue::Queue(const string& _name, bool _autodelete,
              MessageStore* const _store,
              const OwnershipToken* const _owner,
@@ -111,7 +133,8 @@ Queue::Queue(const string& _name, bool _
     broker(b),
     deleted(false),
     barrier(*this),
-    autoDeleteTimeout(0)
+    autoDeleteTimeout(0),
+    selector(new MessageSelector( this ))   // KAG TODO: FIX!!
 {
     if (parent != 0 && broker != 0) {
         ManagementAgent* agent = broker->getManagementAgent();
@@ -220,6 +243,14 @@ void Queue::requeue(const QueuedMessage&
             	enqueue(0, payload);
             }
         }
+
+        for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
+            try{
+                (*i)->requeued(msg);
+            } catch (const std::exception& e) {
+                QPID_LOG(warning, "Exception on notification of message requeue for queue " << getName() << ": " << e.what());
+            }
+        }
     }
     copy.notify();
 }
@@ -229,7 +260,7 @@ bool Queue::acquireMessageAt(const Seque
     Mutex::ScopedLock locker(messageLock);
     assertClusterSafe();
     QPID_LOG(debug, "Attempting to acquire message at " << position);
-    if (messages->remove(position, message)) {
+    if (acquire(position, message )) {
         QPID_LOG(debug, "Acquired message at " << position << " from " << name);
         return true;
     } else {
@@ -238,9 +269,24 @@ bool Queue::acquireMessageAt(const Seque
     }
 }
 
-bool Queue::acquire(const QueuedMessage& msg) {
-    QueuedMessage copy = msg;
-    return acquireMessageAt(msg.position, copy);
+bool Queue::acquire(const QueuedMessage& msg, Consumer::shared_ptr c)
+{
+    Mutex::ScopedLock locker(messageLock);
+    assertClusterSafe();
+    QPID_LOG(debug, c->getName() << " attempting to acquire message at " << msg.position);
+
+    if (!selector->canAcquire( c, msg, locker )) {
+        QPID_LOG(debug, "Not permitted to acquire msg at " << msg.position << " from '" << name);
+        return false;
+    }
+
+    QueuedMessage copy(msg);
+    if (acquire( msg.position, copy )) {
+        QPID_LOG(debug, "Acquired message at " << msg.position << " from " << name);
+        return true;
+    }
+    QPID_LOG(debug, "Could not acquire message at " << msg.position << " from " << name << "; no message at that position");
+    return false;
 }
 
 void Queue::notifyListener()
@@ -276,44 +322,60 @@ bool Queue::getNextMessage(QueuedMessage
 
 Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
 {
+
     while (true) {
         Mutex::ScopedLock locker(messageLock);
-        if (messages->empty()) {
-            QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
+        QueuedMessage msg;
+
+        if (!selector->nextMessage(c, msg, locker)) { // no next available
+            QPID_LOG(debug, "No messages available to dispatch to consumer " <<
+                     c->getName() << " on queue '" << name << "'");
             listeners.addListener(c);
             return NO_MESSAGES;
-        } else {
-            QueuedMessage msg = messages->front();
-            if (msg.payload->hasExpired()) {
-                QPID_LOG(debug, "Message expired from queue '" << name << "'");
-                popAndDequeue();
-                continue;
-            }
+        }
 
-            if (c->filter(msg.payload)) {
-                if (c->accept(msg.payload)) {
-                    m = msg;
-                    pop();
-                    return CONSUMED;
-                } else {
-                    //message(s) are available but consumer hasn't got enough credit
-                    QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'");
-                    return CANT_CONSUME;
-                }
+        if (msg.payload->hasExpired()) {
+            QPID_LOG(debug, "Message expired from queue '" << name << "'");
+            c->position = msg.position;
+            acquire( msg.position, msg );
+            dequeue( 0, msg );
+            continue;
+        }
+
+        // a message is available for this consumer - can the consumer use it?
+
+        if (c->filter(msg.payload)) {
+            if (c->accept(msg.payload)) {
+                acquire( msg.position, m );
+                c->position = msg.position;
+                return CONSUMED;
             } else {
-                //consumer will never want this message
-                QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
+                //message(s) are available but consumer hasn't got enough credit
+                QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'");
                 return CANT_CONSUME;
             }
+        } else {
+            //consumer will never want this message
+            QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
+            c->position = msg.position;
+            return CANT_CONSUME;
         }
     }
 }
 
-
 bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
 {
-    QueuedMessage msg(this);
-    while (seek(msg, c)) {
+    while (true) {
+        Mutex::ScopedLock locker(messageLock);
+        QueuedMessage msg;
+
+        if (!selector->nextMessage(c, msg, locker)) { // no next available
+            QPID_LOG(debug, "No browsable messages available for consumer " <<
+                     c->getName() << " on queue '" << name << "'");
+            listeners.addListener(c);
+            return false;
+        }
+
         if (c->filter(msg.payload) && !msg.payload->hasExpired()) {
             if (c->accept(msg.payload)) {
                 //consumer wants the message
@@ -327,8 +389,8 @@ bool Queue::browseNextMessage(QueuedMess
             }
         } else {
             //consumer will never want this message, continue seeking
-            c->position = msg.position;
             QPID_LOG(debug, "Browser skipping message from '" << name << "'");
+            c->position = msg.position;
         }
     }
     return false;
@@ -358,61 +420,71 @@ bool Queue::dispatch(Consumer::shared_pt
     }
 }
 
-// Find the next message
-bool Queue::seek(QueuedMessage& msg, Consumer::shared_ptr c) {
-    Mutex::ScopedLock locker(messageLock);
-    if (messages->next(c->position, msg)) {
-        return true;
-    } else {
-        listeners.addListener(c);
-        return false;
-    }
-}
-
-QueuedMessage Queue::find(SequenceNumber pos) const {
+bool Queue::find(SequenceNumber pos, QueuedMessage& msg) const {
 
     Mutex::ScopedLock locker(messageLock);
-    QueuedMessage msg;
-    messages->find(pos, msg);
-    return msg;
+    if (messages->find(pos, msg))
+        return true;
+    return false;
 }
 
 void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){
     assertClusterSafe();
-    Mutex::ScopedLock locker(consumerLock);
-    if(exclusive) {
-        throw ResourceLockedException(
-            QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed."));
-    } else if(requestExclusive) {
-        if(consumerCount) {
+    {
+        Mutex::ScopedLock locker(consumerLock);
+        if(exclusive) {
             throw ResourceLockedException(
-                QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied."));
-        } else {
-            exclusive = c->getSession();
+                                          QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed."));
+        } else if(requestExclusive) {
+            if(consumerCount) {
+                throw ResourceLockedException(
+                                              QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied."));
+            } else {
+                exclusive = c->getSession();
+            }
+        }
+        consumerCount++;
+        if (mgmtObject != 0)
+            mgmtObject->inc_consumerCount ();
+        //reset auto deletion timer if necessary
+        if (autoDeleteTimeout && autoDeleteTask) {
+            autoDeleteTask->cancel();
         }
     }
-    consumerCount++;
-    if (mgmtObject != 0)
-        mgmtObject->inc_consumerCount ();
-    //reset auto deletion timer if necessary
-    if (autoDeleteTimeout && autoDeleteTask) {
-        autoDeleteTask->cancel();
+    for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
+        try{
+            Mutex::ScopedLock locker(messageLock);
+            (*i)->consumerAdded(*c);
+        } catch (const std::exception& e) {
+            QPID_LOG(warning, "Exception on notification of new consumer for queue " << getName() << ": " << e.what());
+        }
     }
 }
 
 void Queue::cancel(Consumer::shared_ptr c){
     removeListener(c);
-    Mutex::ScopedLock locker(consumerLock);
-    consumerCount--;
-    if(exclusive) exclusive = 0;
-    if (mgmtObject != 0)
-        mgmtObject->dec_consumerCount ();
+    {
+        Mutex::ScopedLock locker(consumerLock);
+        consumerCount--;
+        if(exclusive) exclusive = 0;
+        if (mgmtObject != 0)
+            mgmtObject->dec_consumerCount ();
+    }
+    for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
+        try{
+            Mutex::ScopedLock locker(messageLock);
+            (*i)->consumerRemoved(*c);
+        } catch (const std::exception& e) {
+            QPID_LOG(warning, "Exception on notification of removed consumer for queue " << getName() << ": " << e.what());
+        }
+    }
 }
 
 QueuedMessage Queue::get(){
     Mutex::ScopedLock locker(messageLock);
     QueuedMessage msg(this);
-    messages->pop(msg);
+    if (messages->pop(msg))
+        consumed( msg );
     return msg;
 }
 
@@ -443,8 +515,15 @@ void Queue::purgeExpired(qpid::sys::Dura
             Mutex::ScopedLock locker(messageLock);
             messages->removeIf(boost::bind(&collect_if_expired, boost::ref(expired), _1));
         }
-        for_each(expired.begin(), expired.end(),
-                 boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
+
+        for (std::deque<QueuedMessage>::const_iterator i = expired.begin();
+             i != expired.end(); ++i) {
+            {
+                Mutex::ScopedLock locker(messageLock);
+                consumed( *i );   // expects messageLock held
+            }
+            dequeue( 0, *i );
+        }
     }
 }
 
@@ -503,18 +582,33 @@ uint32_t Queue::move(const Queue::shared
         QueuedMessage qmsg = messages->front();
         boost::intrusive_ptr<Message> msg = qmsg.payload;
         destq->deliver(msg); // deliver message to the destination queue
-        pop();
-        dequeue(0, qmsg);
+        popAndDequeue();
         count++;
     }
     return count;
 }
 
+/** Acquire the front (oldest) message from the in-memory queue.
+ * assumes messageLock held by caller
+ */
 void Queue::pop()
 {
     assertClusterSafe();
-    messages->pop();
-    ++dequeueSincePurge;
+    QueuedMessage msg;
+    if (messages->pop(msg)) {
+        consumed( msg ); // mark it removed
+        ++dequeueSincePurge;
+    }
+}
+
+/** Acquire the message at the given position, return true and msg if acquire succeeds */
+bool Queue::acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg )
+{
+    if (messages->remove(position, msg)) {
+        consumed( msg );
+        return true;
+    }
+    return false;
 }
 
 void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
@@ -533,6 +627,7 @@ void Queue::push(boost::intrusive_ptr<Me
     }
     copy.notify();
     if (dequeueRequired) {
+        consumed( removed );  // tell observers
         if (isRecovery) {
             //can't issue new requests for the store until
             //recovery is complete
@@ -696,14 +791,16 @@ void Queue::dequeueCommitted(const Queue
 }
 
 /**
- * Removes a message from the in-memory delivery queue as well
- * dequeing it from the logical (and persistent if applicable) queue
+ * Removes the first (oldest) message from the in-memory delivery queue as well dequeing
+ * it from the logical (and persistent if applicable) queue
  */
 void Queue::popAndDequeue()
 {
-    QueuedMessage msg = messages->front();
-    pop();
-    dequeue(0, msg);
+    if (!messages->empty()) {
+        QueuedMessage msg = messages->front();
+        pop();
+        dequeue(0, msg);
+    }
 }
 
 /**
@@ -723,6 +820,20 @@ void Queue::dequeued(const QueuedMessage
     }
 }
 
+/** updates queue observers when a message has become unavailable for transfer,
+ * expects messageLock to be held
+ */
+void Queue::consumed(const QueuedMessage& msg)
+{
+    for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
+        try{
+            (*i)->consumed(msg);
+        } catch (const std::exception& e) {
+            QPID_LOG(warning, "Exception on notification of message removal for queue " << getName() << ": " << e.what());
+        }
+    }
+}
+
 
 void Queue::create(const FieldTable& _settings)
 {
@@ -1233,3 +1344,179 @@ void Queue::UsageBarrier::destroy()
     parent.deleted = true;
     while (count) parent.messageLock.wait();
 }
+
+
+// KAG TBD: flesh out...
+
+
+class MessageGroupManager : public QueueObserver, public MessageSelector
+{
+    const std::string groupIdHeader;    // msg header holding group identifier
+    struct GroupState {
+        const std::string group;          // group identifier
+        //Consumer::shared_ptr owner; // consumer with outstanding acquired messages
+        std::string owner; // consumer with outstanding acquired messages
+        uint32_t acquired;  // count of outstanding acquired messages
+        uint32_t total; // count of enqueued messages in this group
+        GroupState() : acquired(0), total(0) {}
+    };
+    std::map<std::string, struct GroupState> messageGroups;
+    std::set<std::string> consumers;
+
+ public:
+
+    MessageGroupManager(const std::string& header, Queue *q )
+        : QueueObserver(), MessageSelector(q), groupIdHeader( header ) {}
+    void enqueued( const QueuedMessage& qm );
+    void removed( const QueuedMessage& qm );
+    void requeued( const QueuedMessage& qm );
+    void dequeued( const QueuedMessage& qm );
+    void consumerAdded( const Consumer& );
+    void consumerRemoved( const Consumer& );
+    bool nextMessage( Consumer::shared_ptr c, QueuedMessage& next,
+                      const Mutex::ScopedLock&);
+    bool canAcquire(Consumer::shared_ptr consumer, const QueuedMessage& msg,
+                    const Mutex::ScopedLock&);
+};
+
+
+namespace {
+    const std::string NO_GROUP("");
+    const std::string getGroupId( const QueuedMessage& qm, const std::string& key )
+    {
+        const qpid::framing::FieldTable* headers = qm.payload->getApplicationHeaders();
+        if (!headers) return NO_GROUP;
+        return headers->getAsString(key);
+    }
+}
+
+
+void MessageGroupManager::enqueued( const QueuedMessage& qm )
+{
+    std::string group( getGroupId(qm, groupIdHeader) );
+    messageGroups[group].total++;
+}
+
+
+void MessageGroupManager::removed( const QueuedMessage& qm )
+{
+    std::string group( getGroupId(qm, groupIdHeader) );
+    std::map<std::string, struct GroupState>::iterator gs = messageGroups.find( group );
+    assert( gs != messageGroups.end() );
+    gs->second.acquired += 1;
+}
+
+
+void MessageGroupManager::requeued( const QueuedMessage& qm )
+{
+    std::string group( getGroupId(qm, groupIdHeader) );
+    std::map<std::string, struct GroupState>::iterator gs = messageGroups.find( group );
+    assert( gs != messageGroups.end() );
+    GroupState& state( gs->second );
+    assert( state.acquired != 0 );
+    state.acquired -= 1;
+    if (state.acquired == 0 && !state.owner.empty()) {
+        state.owner.clear();   // KAG TODO: need to invalidate consumer's positions?
+    }
+}
+
+
+void MessageGroupManager::dequeued( const QueuedMessage& qm )
+{
+    std::string group( getGroupId(qm, groupIdHeader) );
+    std::map<std::string, struct GroupState>::iterator gs = messageGroups.find( group );
+    assert( gs != messageGroups.end() );
+    GroupState& state( gs->second );
+    assert( state.total != 0 );
+    state.total -= 1;
+    assert( state.acquired != 0 );
+    state.acquired -= 1;
+    if (state.total == 0) messageGroups.erase( gs );
+    else if (state.acquired == 0 && !state.owner.empty()) {
+        state.owner.clear();   // KAG TODO: need to invalidate consumer's positions?
+    }
+}
+
+void MessageGroupManager::consumerAdded( const Consumer& c )
+{
+    bool unique = consumers.insert( c.getName() ).second;
+    (void) unique; assert( unique );
+}
+
+void MessageGroupManager::consumerRemoved( const Consumer& c )
+{
+    size_t count = consumers.erase( c.getName() );
+    (void) count; assert( count == 1 );
+
+    bool needReset = false;
+    for (std::map<std::string, struct GroupState>::iterator gs = messageGroups.begin();
+         gs != messageGroups.end(); ++gs) {
+
+        GroupState& state( gs->second );
+        if (state.owner == c.getName()) {
+            state.owner.clear();
+            needReset = true;
+        }
+    }
+
+    if (needReset) {
+        // KAG TODO: How do I invalidate all consumers that need invalidating????
+    }
+}
+
+
+bool MessageGroupManager::nextMessage( Consumer::shared_ptr c, QueuedMessage& next,
+                                       const Mutex::ScopedLock& l)
+{
+    // KAG TODO: FIX!!!
+    return MessageSelector::nextMessage( c, next, l );
+}
+
+
+bool MessageGroupManager::canAcquire(Consumer::shared_ptr consumer, const QueuedMessage& qm,
+                                     const Mutex::ScopedLock&)
+{
+    std::string group( getGroupId(qm, groupIdHeader) );
+    std::map<std::string, struct GroupState>::iterator gs = messageGroups.find( group );
+    assert( gs != messageGroups.end() );
+    GroupState& state( gs->second );
+
+    if (state.owner.empty()) {
+        state.owner = consumer->getName();
+        return true;
+    }
+    return state.owner == consumer->getName();
+}
+
+
+
+
+
+// default selector - requires messageLock to be held by caller!
+bool MessageSelector::nextMessage( Consumer::shared_ptr c, QueuedMessage& next,
+                                   const Mutex::ScopedLock& /*just to enforce locking*/)
+{
+    Messages& messages(queue->getMessages());
+
+    if (messages.empty())
+        return false;
+
+    if (c->preAcquires()) {     // not browsing
+        next = messages.front();
+        return true;
+    } else if (messages.next(c->position, next))
+        return true;
+    return false;
+}
+
+
+// default selector - requires messageLock to be held by caller!
+bool MessageSelector::canAcquire(Consumer::shared_ptr, const QueuedMessage&,
+                                 const Mutex::ScopedLock& /*just to enforce locking*/)
+{
+    return true;    // always give permission to acquire
+}
+
+
+
+

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.h?rev=1154376&r1=1154375&r2=1154376&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.h Fri Aug  5 20:47:10 2011
@@ -59,8 +59,8 @@ class MessageStore;
 class QueueEvents;
 class QueueRegistry;
 class TransactionContext;
-class Exchange;
-
+class MessageSelector;
+ 
 /**
  * The brokers representation of an amqp queue. Messages are
  * delivered to a queue from where they can be dispatched to
@@ -129,10 +129,10 @@ class Queue : public boost::enable_share
     UsageBarrier barrier;
     int autoDeleteTimeout;
     boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask;
+    std::auto_ptr<MessageSelector> selector;
 
     void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false);
     void setPolicy(std::auto_ptr<QueuePolicy> policy);
-    bool seek(QueuedMessage& msg, Consumer::shared_ptr position);
     bool getNextMessage(QueuedMessage& msg, Consumer::shared_ptr c);
     ConsumeCode consumeNextMessage(QueuedMessage& msg, Consumer::shared_ptr c);
     bool browseNextMessage(QueuedMessage& msg, Consumer::shared_ptr c);
@@ -142,10 +142,16 @@ class Queue : public boost::enable_share
 
     bool isExcluded(boost::intrusive_ptr<Message>& msg);
 
+    /** update queue observers with new message state */
     void enqueued(const QueuedMessage& msg);
+    void consumed(const QueuedMessage& msg);
     void dequeued(const QueuedMessage& msg);
-    void pop();
-    void popAndDequeue();
+
+    /** modify the Queue's message container - assumes messageLock held */
+    void pop();             // acquire front msg
+    void popAndDequeue();   // acquire and dequeue front msg
+    // acquire message @ position, return true and set msg if acquire succeeds
+    bool acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg );
 
     void forcePersistent(QueuedMessage& msg);
     int getEventMode();
@@ -191,8 +197,15 @@ class Queue : public boost::enable_share
                              Broker* broker = 0);
     QPID_BROKER_EXTERN ~Queue();
 
+    /** allow the Consumer to consume or browse the next available message */
     QPID_BROKER_EXTERN bool dispatch(Consumer::shared_ptr);
 
+    /** allow the Consumer to acquire a message that it has browsed.
+     * @param msg - message to be acquired.
+     * @return false if message is no longer available for acquire.
+     */
+    QPID_BROKER_EXTERN bool acquire(const QueuedMessage& msg, const Consumer::shared_ptr c);
+
     /**
      * Used to configure a new queue and create a persistent record
      * for it in store if required.
@@ -216,7 +229,11 @@ class Queue : public boost::enable_share
     bool bind(boost::shared_ptr<Exchange> exchange, const std::string& key,
               const qpid::framing::FieldTable& arguments=qpid::framing::FieldTable());
 
-    QPID_BROKER_EXTERN bool acquire(const QueuedMessage& msg);
+    /** Acquire the message at the given position if it is available for acquire.  Not to
+     * be used by clients, but used by the broker for queue management.
+     * @param message - set to the acquired message if true returned.
+     * @return true if the message has been acquired.
+     */
     QPID_BROKER_EXTERN bool acquireMessageAt(const qpid::framing::SequenceNumber& position, QueuedMessage& message);
 
     /**
@@ -302,12 +319,12 @@ class Queue : public boost::enable_share
     bool isEnqueued(const QueuedMessage& msg);
 
     /**
-     * Gets the next available message
+     * Acquires the next available (oldest) message
      */
     QPID_BROKER_EXTERN QueuedMessage get();
 
-    /** Get the message at position pos */
-    QPID_BROKER_EXTERN QueuedMessage find(framing::SequenceNumber pos) const;
+    /** Get the message at position pos, returns true if found and sets msg */
+    QPID_BROKER_EXTERN bool find(framing::SequenceNumber pos, QueuedMessage& msg ) const;
 
     const QueuePolicy* getPolicy();
 

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/QueueEvents.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/QueueEvents.cpp?rev=1154376&r1=1154375&r2=1154376&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/QueueEvents.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/QueueEvents.cpp Fri Aug  5 20:47:10 2011
@@ -129,6 +129,10 @@ class EventGenerator : public QueueObser
     {
         if (!enqueueOnly) manager.dequeued(m);
     }
+
+    void consumed(const QueuedMessage&) {};
+    void requeued(const QueuedMessage&) {};
+
   private:
     QueueEvents& manager;
     const bool enqueueOnly;

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp?rev=1154376&r1=1154375&r2=1154376&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp Fri Aug  5 20:47:10 2011
@@ -377,11 +377,12 @@ void QueueFlowLimit::setState(const qpid
             ++i;
             fcmsg.add(first, last);
             for (SequenceNumber seq = first; seq <= last; ++seq) {
-                QueuedMessage msg(queue->find(seq));   // fyi: msg.payload may be null if msg is delivered & unacked
+                QueuedMessage msg;
+                bool found = queue->find(seq, msg);   // fyi: msg.payload may be null if msg is delivered & unacked
+                (void) found; assert(found);    // avoid unused variable warning when NDEBUG set
                 bool unique;
                 unique = index.insert(std::pair<framing::SequenceNumber, boost::intrusive_ptr<Message> >(seq, msg.payload)).second;
-                // Like this to avoid tripping up unused variable warning when NDEBUG set
-                if (!unique) assert(unique);
+                (void) unique; assert(unique);  // ditto NDEBUG warning
             }
         }
     }

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/QueueFlowLimit.h?rev=1154376&r1=1154375&r2=1154376&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/QueueFlowLimit.h (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/QueueFlowLimit.h Fri Aug  5 20:47:10 2011
@@ -84,6 +84,9 @@ class Broker;
     QPID_BROKER_EXTERN void enqueued(const QueuedMessage&);
     /** the queue has removed QueuedMessage.  Returns true if flow state changes */
     QPID_BROKER_EXTERN void dequeued(const QueuedMessage&);
+    /** ignored */
+    QPID_BROKER_EXTERN void consumed(const QueuedMessage&) {};
+    QPID_BROKER_EXTERN void requeued(const QueuedMessage&) {};
 
     /** for clustering: */
     QPID_BROKER_EXTERN void getState(qpid::framing::FieldTable&) const;

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/QueueObserver.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/QueueObserver.h?rev=1154376&r1=1154375&r2=1154376&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/QueueObserver.h (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/QueueObserver.h Fri Aug  5 20:47:10 2011
@@ -25,17 +25,49 @@ namespace qpid {
 namespace broker {
 
 struct QueuedMessage;
+class Consumer;
+
 /**
- * Interface for notifying classes who want to act as 'observers' of a
- * queue of particular events.
+ * Interface for notifying classes who want to act as 'observers' of a queue of particular
+ * events.
+ *
+ * The events that are monitored reflect the relationship between a particular message and
+ * the queue it has been delivered to.  A message can be considered in one of three states
+ * with respect to the queue:
+ *
+ * 1) "Available" - available for transfer to consumers,
+ * 2) "Locked" - to a particular consumer, no longer available for transfer, but not
+ * considered fully dequeued.
+ * 3) "Dequeued" - removed from the queue and no longer available to any consumer.
+ *
+ * The queue events that are observable are:
+ *
+ * "Enqueued" - the message is "Available" - on the queue for transfer to any consumer
+ * (e.g. browse or acquire)
+ *
+ * "Consumed" - the message is "Locked" - a consumer has claimed exclusive access to it.
+ * It is no longer available for other consumers to browse or acquire, but it is not yet
+ * considered dequeued as it may be requeued by the consumer.
+ *
+ * "Requeued" - a previously-consumed message is 'unlocked': it is put back on the queue
+ * at its original position and returns to the "Available" state.
+ *
+ * "Dequeued" - a Locked message is no longer queued.  At this point, the queue no longer
+ * tracks the message, and the broker considers the consumer's transaction complete.
  */
 class QueueObserver
 {
   public:
     virtual ~QueueObserver() {}
+
+    // note: the Queue will hold the messageLock while calling these methods!
     virtual void enqueued(const QueuedMessage&) = 0;
+    virtual void consumed(const QueuedMessage&) = 0;
+    virtual void requeued(const QueuedMessage&) = 0;
     virtual void dequeued(const QueuedMessage&) = 0;
-  private:
+    virtual void consumerAdded( const Consumer& ) {};
+    virtual void consumerRemoved( const Consumer& ) {};
+ private:
 };
 }} // namespace qpid::broker
 

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/QueuePolicy.cpp?rev=1154376&r1=1154375&r2=1154376&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/QueuePolicy.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/QueuePolicy.cpp Fri Aug  5 20:47:10 2011
@@ -269,8 +269,7 @@ bool RingQueuePolicy::checkLimit(boost::
 
     do {
         QueuedMessage oldest  = queue.front();
-
-        if (oldest.queue->acquire(oldest) || !strict) {
+        if (oldest.queue->acquireMessageAt(oldest.position, oldest) || !strict) {
             queue.pop_front();
             pendingDequeues.push_back(oldest);
             QPID_LOG(debug, "Ring policy triggered in " << name 

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1154376&r1=1154375&r2=1154376&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/SemanticState.cpp Fri Aug  5 20:47:10 2011
@@ -269,9 +269,8 @@ SemanticState::ConsumerImpl::ConsumerImp
 
 
 ) :
-    Consumer(_acquire),
+    Consumer(_name, _acquire),
     parent(_parent),
-    name(_name),
     queue(_queue),
     ackExpected(ack),
     acquire(_acquire),
@@ -295,7 +294,7 @@ SemanticState::ConsumerImpl::ConsumerImp
 
         if (agent != 0)
         {
-            mgmtObject = new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId() ,name,
+            mgmtObject = new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId(), getName(),
                                                 !acquire, ackExpected, exclusive, ManagementAgent::toMap(arguments));
             agent->addObject (mgmtObject);
             mgmtObject->set_creditMode("WINDOW");
@@ -327,16 +326,15 @@ bool SemanticState::ConsumerImpl::delive
 {
     assertClusterSafe();
     allocateCredit(msg.payload);
-    DeliveryRecord record(msg, queue, name, acquire, !ackExpected, windowing);
+    DeliveryRecord record(msg, queue, getName(), acquire, !ackExpected, windowing);
     bool sync = syncFrequency && ++deliveryCount >= syncFrequency;
     if (sync) deliveryCount = 0;//reset
     parent->deliver(record, sync);
-    if (!ackExpected && acquire) record.setEnded();//allows message to be released now its been delivered
     if (windowing || ackExpected || !acquire) {
         parent->record(record);
     }
-    if (acquire && !ackExpected) {
-        queue->dequeue(0, msg);
+    if (acquire && !ackExpected) {  // auto acquire && auto accept
+        record.accept( 0 /*no ctxt*/ );
     }
     if (mgmtObject) { mgmtObject->inc_delivered(); }
     return true;
@@ -556,50 +554,61 @@ void SemanticState::deliver(DeliveryReco
     return deliveryAdapter.deliver(msg, sync);
 }
 
-SemanticState::ConsumerImpl& SemanticState::find(const std::string& destination)
+const SemanticState::ConsumerImpl::shared_ptr SemanticState::find(const std::string& destination) const
 {
-    ConsumerImplMap::iterator i = consumers.find(destination);
-    if (i == consumers.end()) {
-        throw NotFoundException(QPID_MSG("Unknown destination " << destination));
+    ConsumerImpl::shared_ptr consumer;
+    if (!find(destination, consumer)) {
+        throw NotFoundException(QPID_MSG("Unknown destination " << destination << " session=" << session.getSessionId()));
     } else {
-        return *(i->second);
+        return consumer;
+    }
+}
+
+bool SemanticState::find(const std::string& destination, ConsumerImpl::shared_ptr& consumer) const
+{
+    // @todo KAG gsim: shouldn't the consumers map be locked????
+    ConsumerImplMap::const_iterator i = consumers.find(destination);
+    if (i == consumers.end()) {
+        return false;
     }
+    consumer = i->second;
+    return true;
 }
 
 void SemanticState::setWindowMode(const std::string& destination)
 {
-    find(destination).setWindowMode();
+    find(destination)->setWindowMode();
 }
 
 void SemanticState::setCreditMode(const std::string& destination)
 {
-    find(destination).setCreditMode();
+    find(destination)->setCreditMode();
 }
 
 void SemanticState::addByteCredit(const std::string& destination, uint32_t value)
 {
-    ConsumerImpl& c = find(destination);
-    c.addByteCredit(value);
-    c.requestDispatch();
+    ConsumerImpl::shared_ptr c = find(destination);
+    c->addByteCredit(value);
+    c->requestDispatch();
 }
 
 
 void SemanticState::addMessageCredit(const std::string& destination, uint32_t value)
 {
-    ConsumerImpl& c = find(destination);
-    c.addMessageCredit(value);
-    c.requestDispatch();
+    ConsumerImpl::shared_ptr c = find(destination);
+    c->addMessageCredit(value);
+    c->requestDispatch();
 }
 
 void SemanticState::flush(const std::string& destination)
 {
-    find(destination).flush();
+    find(destination)->flush();
 }
 
 
 void SemanticState::stop(const std::string& destination)
 {
-    find(destination).stop();
+    find(destination)->stop();
 }
 
 void SemanticState::ConsumerImpl::setWindowMode()
@@ -682,7 +691,7 @@ AckRange SemanticState::findRange(Delive
 void SemanticState::acquire(DeliveryId first, DeliveryId last, DeliveryIds& acquired)
 {
     AckRange range = findRange(first, last);
-    for_each(range.start, range.end, AcquireFunctor(acquired));
+    for_each(range.start, range.end, AcquireFunctor(this, acquired));
 }
 
 void SemanticState::release(DeliveryId first, DeliveryId last, bool setRedelivered)

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/SemanticState.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/SemanticState.h?rev=1154376&r1=1154375&r2=1154376&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/SemanticState.h Fri Aug  5 20:47:10 2011
@@ -75,7 +75,6 @@ class SemanticState : private boost::non
     {
         mutable qpid::sys::Mutex lock;
         SemanticState* const parent;
-        const std::string name;
         const boost::shared_ptr<Queue> queue;
         const bool ackExpected;
         const bool acquire;
@@ -129,8 +128,6 @@ class SemanticState : private boost::non
 
         bool doOutput();
 
-        std::string getName() const { return name; }
-
         bool isAckExpected() const { return ackExpected; }
         bool isAcquire() const { return acquire; }
         bool isWindowing() const { return windowing; }
@@ -187,7 +184,8 @@ class SemanticState : private boost::non
     SessionContext& getSession() { return session; }
     const SessionContext& getSession() const { return session; }
 
-    ConsumerImpl& find(const std::string& destination);
+    const ConsumerImpl::shared_ptr find(const std::string& destination) const;
+    bool find(const std::string& destination, ConsumerImpl::shared_ptr&) const;
     
     /**
      * Get named queue, never returns 0.

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/ThresholdAlerts.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/ThresholdAlerts.h?rev=1154376&r1=1154375&r2=1154376&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/ThresholdAlerts.h (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/ThresholdAlerts.h Fri Aug  5 20:47:10 2011
@@ -50,6 +50,9 @@ class ThresholdAlerts : public QueueObse
                     const long repeatInterval);
     void enqueued(const QueuedMessage&);
     void dequeued(const QueuedMessage&);
+    void consumed(const QueuedMessage&) {};
+    void requeued(const QueuedMessage&) {};
+
     static void observe(Queue& queue, qpid::management::ManagementAgent& agent,
                         const uint64_t countThreshold,
                         const uint64_t sizeThreshold,

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=1154376&r1=1154375&r2=1154376&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/cluster/Connection.cpp Fri Aug  5 20:47:10 2011
@@ -409,11 +409,11 @@ void Connection::shadowSetUser(const std
 
 void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled, const SequenceNumber& position)
 {
-    broker::SemanticState::ConsumerImpl& c = semanticState().find(name);
-    c.position = position;
-    c.setBlocked(blocked);
-    if (notifyEnabled) c.enableNotify(); else c.disableNotify();
-    updateIn.consumerNumbering.add(c.shared_from_this());
+    broker::SemanticState::ConsumerImpl::shared_ptr c = semanticState().find(name);
+    c->position = position;
+    c->setBlocked(blocked);
+    if (notifyEnabled) c->enableNotify(); else c->disableNotify();
+    updateIn.consumerNumbering.add(c);
 }
 
 
@@ -444,7 +444,7 @@ void Connection::outputTask(uint16_t cha
     if (!session)
         throw Exception(QPID_MSG(cluster << " channel not attached " << *this
                                  << "[" <<  channel << "] "));
-    OutputTask* task = &session->getSemanticState().find(name);
+    OutputTask* task = session->getSemanticState().find(name).get();
     connection->getOutputTasks().addOutputTask(task);
 }
 
@@ -534,7 +534,7 @@ void Connection::deliveryRecord(const st
             m.position = position;
             if (enqueued) queue->updateEnqueued(m); //inform queue of the message
         } else {                // Message at original position in original queue
-            m = queue->find(position);
+            queue->find(position, m);
         }
         if (!m.payload)
             throw Exception(QPID_MSG("deliveryRecord no update message"));

Modified: qpid/branches/qpid-3346/qpid/cpp/src/tests/QueueTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/tests/QueueTest.cpp?rev=1154376&r1=1154375&r2=1154376&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/tests/QueueTest.cpp Fri Aug  5 20:47:10 2011
@@ -58,7 +58,7 @@ public:
 
     intrusive_ptr<Message> last;
     bool received;
-    TestConsumer(bool acquire = true):Consumer(acquire), received(false) {};
+    TestConsumer(bool acquire = true):Consumer("test", acquire), received(false) {};
 
     virtual bool deliver(QueuedMessage& msg){
         last = msg.payload;
@@ -324,14 +324,18 @@ QPID_AUTO_TEST_CASE(testSearch){
     queue->deliver(msg3);
 
     SequenceNumber seq(2);
-    QueuedMessage qm = queue->find(seq);
+    QueuedMessage qm;
+    TestConsumer::shared_ptr c1(new TestConsumer());
+
+    BOOST_CHECK(queue->find(seq, qm));
 
     BOOST_CHECK_EQUAL(seq.getValue(), qm.position.getValue());
 
-    queue->acquire(qm);
+    queue->acquire(qm, c1);
     BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u);
     SequenceNumber seq1(3);
-    QueuedMessage qm1 = queue->find(seq1);
+    QueuedMessage qm1;
+    BOOST_CHECK(queue->find(seq1, qm1));
     BOOST_CHECK_EQUAL(seq1.getValue(), qm1.position.getValue());
 
 }
@@ -551,12 +555,13 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){
     QueuedMessage qmsg2(queue.get(), msg2, ++sequence);
     framing::SequenceNumber sequence1(10);
     QueuedMessage qmsg3(queue.get(), 0, sequence1);
+    TestConsumer::shared_ptr dummy(new TestConsumer());
 
-    BOOST_CHECK(!queue->acquire(qmsg));
-    BOOST_CHECK(queue->acquire(qmsg2));
+    BOOST_CHECK(!queue->acquire(qmsg, dummy));
+    BOOST_CHECK(queue->acquire(qmsg2, dummy));
     // Acquire the massage again to test failure case.
-    BOOST_CHECK(!queue->acquire(qmsg2));
-    BOOST_CHECK(!queue->acquire(qmsg3));
+    BOOST_CHECK(!queue->acquire(qmsg2, dummy));
+    BOOST_CHECK(!queue->acquire(qmsg3, dummy));
 
     BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u);
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message